Browse Source

Merge branch 'cpq-dev' of wangfan/adweb3-server into master

chenpeiqing 3 months ago
parent
commit
676200f8e2

+ 42 - 48
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/mq/EnquiryReceiver.java

@@ -29,7 +29,7 @@ import static org.jeecg.modules.adweb.enquiry.service.impl.AdwebEnquiryServiceIm
  */
 @Slf4j
 @RabbitComponent(value = "enquiryReceiver")
-public class EnquiryReceiver extends BaseRabbiMqHandler<EnquiryDTO> {
+public class EnquiryReceiver extends BaseRabbiMqHandler<List<EnquiryDTO>> {
 
     @Autowired
     private IAdwebSiteService adwebSiteService;
@@ -43,57 +43,51 @@ public class EnquiryReceiver extends BaseRabbiMqHandler<EnquiryDTO> {
     @RabbitHandler
     // ackMode需要设置为MANUAL - BaseRabbiMqHandler.onMessage()有channel.basicAck()操作
     @RabbitListener(queues = "site_enquiry", ackMode = "MANUAL")
-    public void onMessage(
-            EnquiryDTO enquiryDTO,
-            Channel channel,
-            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
-        super.onMessage(
-                enquiryDTO,
-                deliveryTag,
-                channel,
-                new MqListener<EnquiryDTO>() {
-                    @Override
-                    public void handler(EnquiryDTO enquiry, Channel channel) {
-                        // 业务处理
-                        log.info("收到MQ消息 {}", enquiry);
+    public void onMessage(List<EnquiryDTO> enquiries, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
+        super.onMessage(enquiries, deliveryTag, channel, new MqListener<List<EnquiryDTO>>() {
+            @Override
+            public void handler(List<EnquiryDTO> enquiries, Channel channel) {
+                // 业务处理
+                log.info("收到MQ消息 {}", enquiries);
 
-                        if (adwebRedisUtil.hasKey(EnquiryUniqueKey + enquiry.getSiteHost())) {
-                            // 消息内容唯一性判断
-                            List<String> enquiryUniqueIds = (List<String>) adwebRedisUtil.get(EnquiryUniqueKey + enquiry.getSiteHost());
-                            // 如果该消息内容原数据中已存在,则将该重复消息踢出队列
-                            if (enquiryUniqueIds.contains(enquiry.getSiteHost() + enquiry.getRecordId())) {
-                                log.info("该询盘已经消费过,从队列中清除:{}", enquiry.getRecordId());
-                                try {
-                                    channel.basicAck(deliveryTag, false);
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            }
-                        }
-
-
-                        // 查看发布当前消息的站点是否是 adweb3 平台中绑定的是否是有效站点, 如果不是则直接确认消息,不做业务处理
-                        List<AdwebSite> adwebSites =
-                                adwebSiteService.list(
-                                        new LambdaQueryWrapper<AdwebSite>()
-                                                .like(AdwebSite::getDomain, enquiry.getSiteHost())
-                                                .ne(AdwebSite::getStatus, 0));
+                for (EnquiryDTO enquiry : enquiries) {
+                    // 查看发布当前消息的站点是否是 adweb3 平台中绑定的是否是有效站点, 如果不是则直接确认消息,不做业务处理
+                    List<AdwebSite> adwebSites =
+                            adwebSiteService.list(
+                                    new LambdaQueryWrapper<AdwebSite>()
+                                            .like(AdwebSite::getDomain, enquiry.getSiteHost())
+                                            .ne(AdwebSite::getStatus, 0));
 
-                        if (adwebSites.isEmpty()) {
-                            log.error(
-                                    "站点:{} 不是adweb3的有效绑定站点,MQ消息确认失败",
-                                    enquiry.getSiteHost());
-                            return;
+                    if (adwebSites.isEmpty()) {
+                        log.error(
+                                "站点:{} 不是adweb3的有效绑定站点,MQ消息确认失败",
+                                enquiry.getSiteHost());
+                        return;
+                    }
+                    
+                    if (adwebRedisUtil.hasKey(EnquiryUniqueKey + enquiry.getSiteHost())) {
+                        // 消息内容唯一性判断
+                        List<String> enquiryUniqueIds = (List<String>) adwebRedisUtil.get(EnquiryUniqueKey + enquiry.getSiteHost());
+                        // 如果该消息内容原数据中已存在,则将该重复消息踢出队列
+                        if (enquiryUniqueIds.contains(enquiry.getSiteHost() + enquiry.getRecordId())) {
+                            log.info("该询盘已经消费过,从队列中清除:{}", enquiry.getRecordId());
+                            try {
+                                channel.basicAck(deliveryTag, false);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
                         }
+                    }
 
-                        try {
-                            // 询盘入库
-                            adwebEnquiryService.addEnquiry(
-                                    enquiry, enquiry.getPluginName(), adwebSites);
-                        } catch (Exception e) {
-                            log.error("询盘入库异常:{}", e.getMessage());
-                        }
+                    try {
+                        // 询盘入库
+                        adwebEnquiryService.addEnquiry(
+                                enquiry, enquiry.getPluginName(), adwebSites);
+                    } catch (Exception e) {
+                        log.error("询盘入库异常:{}", e.getMessage());
                     }
-                });
+                }
+            }
+        });
     }
 }