|
@@ -29,7 +29,7 @@ import static org.jeecg.modules.adweb.enquiry.service.impl.AdwebEnquiryServiceIm
|
|
|
*/
|
|
|
@Slf4j
|
|
|
@RabbitComponent(value = "enquiryReceiver")
|
|
|
-public class EnquiryReceiver extends BaseRabbiMqHandler<EnquiryDTO> {
|
|
|
+public class EnquiryReceiver extends BaseRabbiMqHandler<List<EnquiryDTO>> {
|
|
|
|
|
|
@Autowired
|
|
|
private IAdwebSiteService adwebSiteService;
|
|
@@ -43,57 +43,51 @@ public class EnquiryReceiver extends BaseRabbiMqHandler<EnquiryDTO> {
|
|
|
@RabbitHandler
|
|
|
// ackMode需要设置为MANUAL - BaseRabbiMqHandler.onMessage()有channel.basicAck()操作
|
|
|
@RabbitListener(queues = "site_enquiry", ackMode = "MANUAL")
|
|
|
- public void onMessage(
|
|
|
- EnquiryDTO enquiryDTO,
|
|
|
- Channel channel,
|
|
|
- @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
|
|
- super.onMessage(
|
|
|
- enquiryDTO,
|
|
|
- deliveryTag,
|
|
|
- channel,
|
|
|
- new MqListener<EnquiryDTO>() {
|
|
|
- @Override
|
|
|
- public void handler(EnquiryDTO enquiry, Channel channel) {
|
|
|
- // 业务处理
|
|
|
- log.info("收到MQ消息 {}", enquiry);
|
|
|
+ public void onMessage(List<EnquiryDTO> enquiries, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
|
|
+ super.onMessage(enquiries, deliveryTag, channel, new MqListener<List<EnquiryDTO>>() {
|
|
|
+ @Override
|
|
|
+ public void handler(List<EnquiryDTO> enquiries, Channel channel) {
|
|
|
+ // 业务处理
|
|
|
+ log.info("收到MQ消息 {}", enquiries);
|
|
|
|
|
|
- if (adwebRedisUtil.hasKey(EnquiryUniqueKey + enquiry.getSiteHost())) {
|
|
|
- // 消息内容唯一性判断
|
|
|
- List<String> enquiryUniqueIds = (List<String>) adwebRedisUtil.get(EnquiryUniqueKey + enquiry.getSiteHost());
|
|
|
- // 如果该消息内容原数据中已存在,则将该重复消息踢出队列
|
|
|
- if (enquiryUniqueIds.contains(enquiry.getSiteHost() + enquiry.getRecordId())) {
|
|
|
- log.info("该询盘已经消费过,从队列中清除:{}", enquiry.getRecordId());
|
|
|
- try {
|
|
|
- channel.basicAck(deliveryTag, false);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- // 查看发布当前消息的站点是否是 adweb3 平台中绑定的是否是有效站点, 如果不是则直接确认消息,不做业务处理
|
|
|
- List<AdwebSite> adwebSites =
|
|
|
- adwebSiteService.list(
|
|
|
- new LambdaQueryWrapper<AdwebSite>()
|
|
|
- .like(AdwebSite::getDomain, enquiry.getSiteHost())
|
|
|
- .ne(AdwebSite::getStatus, 0));
|
|
|
+ for (EnquiryDTO enquiry : enquiries) {
|
|
|
+ // 查看发布当前消息的站点是否是 adweb3 平台中绑定的是否是有效站点, 如果不是则直接确认消息,不做业务处理
|
|
|
+ List<AdwebSite> adwebSites =
|
|
|
+ adwebSiteService.list(
|
|
|
+ new LambdaQueryWrapper<AdwebSite>()
|
|
|
+ .like(AdwebSite::getDomain, enquiry.getSiteHost())
|
|
|
+ .ne(AdwebSite::getStatus, 0));
|
|
|
|
|
|
- if (adwebSites.isEmpty()) {
|
|
|
- log.error(
|
|
|
- "站点:{} 不是adweb3的有效绑定站点,MQ消息确认失败",
|
|
|
- enquiry.getSiteHost());
|
|
|
- return;
|
|
|
+ if (adwebSites.isEmpty()) {
|
|
|
+ log.error(
|
|
|
+ "站点:{} 不是adweb3的有效绑定站点,MQ消息确认失败",
|
|
|
+ enquiry.getSiteHost());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (adwebRedisUtil.hasKey(EnquiryUniqueKey + enquiry.getSiteHost())) {
|
|
|
+ // 消息内容唯一性判断
|
|
|
+ List<String> enquiryUniqueIds = (List<String>) adwebRedisUtil.get(EnquiryUniqueKey + enquiry.getSiteHost());
|
|
|
+ // 如果该消息内容原数据中已存在,则将该重复消息踢出队列
|
|
|
+ if (enquiryUniqueIds.contains(enquiry.getSiteHost() + enquiry.getRecordId())) {
|
|
|
+ log.info("该询盘已经消费过,从队列中清除:{}", enquiry.getRecordId());
|
|
|
+ try {
|
|
|
+ channel.basicAck(deliveryTag, false);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- // 询盘入库
|
|
|
- adwebEnquiryService.addEnquiry(
|
|
|
- enquiry, enquiry.getPluginName(), adwebSites);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("询盘入库异常:{}", e.getMessage());
|
|
|
- }
|
|
|
+ try {
|
|
|
+ // 询盘入库
|
|
|
+ adwebEnquiryService.addEnquiry(
|
|
|
+ enquiry, enquiry.getPluginName(), adwebSites);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("询盘入库异常:{}", e.getMessage());
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|