Browse Source

Changes 修改数据业务

cyan 1 month ago
parent
commit
32eb85e848

+ 4 - 0
jeecg-boot/jeecg-boot-base-core/pom.xml

@@ -285,6 +285,10 @@
 			<artifactId>oapi-java-sdk</artifactId>
 			<version>release-V4-2.3.0</version>
 		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-data-mongodb</artifactId>
+		</dependency>
 	</dependencies>
 
 </project>

+ 15 - 0
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/dto/GroupMsgResult.java

@@ -0,0 +1,15 @@
+package org.jeecg.modules.okki.wechatinnergroup.dto;
+
+import lombok.Data;
+
+/**
+ * @author 12453
+ */
+@Data
+public class GroupMsgResult {
+    private Integer siteId;
+
+    private Long maxSeq;
+
+    private String idOfMaxSeq;
+}

+ 2 - 0
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/entity/OkkiShopWechatInnerGroupMsg.java

@@ -11,6 +11,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
 import org.jeecgframework.poi.excel.annotation.Excel;
+import org.springframework.data.mongodb.core.mapping.Document;
 import org.springframework.format.annotation.DateTimeFormat;
 
 import java.io.Serializable;
@@ -26,6 +27,7 @@ import java.io.Serializable;
 @Accessors(chain = true)
 @EqualsAndHashCode(callSuper = false)
 @ApiModel(value = "okki_shop_wechat_inner_group_msg对象", description = "okki_shop_wechat_inner_group_msg")
+@Document(collection = "okki_shop_wechat_inner_group_msg")
 public class OkkiShopWechatInnerGroupMsg implements Serializable {
     private static final long serialVersionUID = 1L;
 

+ 3 - 1
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/job/SyncWechatInnerGroupMsgJob.java

@@ -28,7 +28,9 @@ public class SyncWechatInnerGroupMsgJob implements Job {
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         long start = System.currentTimeMillis();
         log.info(String.format("开始同步站点-【企业微信-内部群】消息-普通定时任务 SyncWechatInnerGroupMsgJob !  时间:" + DateUtils.now()));
-        okkiShopWechatInnerGroupMsgService.getSiteWechatInnerMsgMaxSeq(this.parameter);
+        okkiShopWechatInnerGroupMsgService.getSiteWechatInnerMsgMaxSeqByMysql(this.parameter);
+
+        okkiShopWechatInnerGroupMsgService.getSiteWechatInnerMsgMaxSeqByMongodb(this.parameter);
         long end = System.currentTimeMillis();
         log.info("请求同步站点-【企业微信-内部群】消息-普通定时任务 SyncWechatInnerGroupMsgJob !结束,耗时:{}s", (end - start) / 1000);
     }

+ 9 - 1
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/service/IOkkiShopWechatInnerGroupMsgService.java

@@ -16,5 +16,13 @@ public interface IOkkiShopWechatInnerGroupMsgService extends IService<OkkiShopWe
      *
      * @param siteId
      */
-    public void getSiteWechatInnerMsgMaxSeq(String siteId);
+    public void getSiteWechatInnerMsgMaxSeqByMysql(String siteId);
+
+
+    /**
+     * 获取【企业微信-内部群】消息
+     *
+     * @param siteId
+     */
+    public void getSiteWechatInnerMsgMaxSeqByMongodb(String siteId);
 }

+ 54 - 0
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/service/WechatInnerGroupMongodbRepository.java

@@ -0,0 +1,54 @@
+package org.jeecg.modules.okki.wechatinnergroup.service;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.modules.okki.wechatinnergroup.dto.GroupMsgResult;
+import org.jeecg.modules.okki.wechatinnergroup.entity.OkkiShopWechatInnerGroupMsg;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
+import org.springframework.data.mongodb.core.aggregation.AggregationResults;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * @author 12453
+ */
+@Slf4j
+@Service
+public class WechatInnerGroupMongodbRepository{
+
+    @Resource
+    private MongoTemplate mongoTemplate;
+
+    /**
+     * 保存群聊消息
+     * @param shopWechatInnerGroupMsgs
+     */
+    public void saveAll(List<OkkiShopWechatInnerGroupMsg> shopWechatInnerGroupMsgs){
+        shopWechatInnerGroupMsgs.forEach(c->{
+            mongoTemplate.save(c);
+        });
+    }
+
+    /**
+     * 查询最大seq
+     * @return
+     */
+    public AggregationResults<GroupMsgResult> findMaxSeqBySiteId(Integer siteId) {
+        Aggregation aggregation = Aggregation.newAggregation(
+                Aggregation.match(Criteria.where("siteId").is(siteId)),
+
+//                Aggregation.sort(Sort.by(Sort.Order.desc("seq"))),
+
+                Aggregation.group("siteId")
+                        .max("seq").as("maxSeq")
+                        .max("_id").as("idOfMaxSeq")
+                        .first("siteId").as("siteId")
+        );
+        return mongoTemplate.aggregate(aggregation, "okki_shop_wechat_inner_group_msg", GroupMsgResult.class);
+    }
+
+}

+ 53 - 5
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/okki/wechatinnergroup/service/impl/OkkiShopWechatInnerGroupMsgServiceImpl.java

@@ -1,17 +1,25 @@
 package org.jeecg.modules.okki.wechatinnergroup.service.impl;
 
+import cn.hutool.core.collection.CollectionUtil;
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
+import org.jeecg.common.util.FastJsonUtil;
 import org.jeecg.common.util.ThreadPoolUtil;
+import org.jeecg.modules.okki.site.entity.OkkiSite;
+import org.jeecg.modules.okki.site.service.IOkkiSiteService;
 import org.jeecg.modules.okki.utils.HttpClientUtils;
 import org.jeecg.modules.okki.wechatgroup.dto.WechatInnerGroupMsgRespDto;
+import org.jeecg.modules.okki.wechatinnergroup.dto.GroupMsgResult;
 import org.jeecg.modules.okki.wechatinnergroup.entity.OkkiShopWechatInnerGroupMsg;
 import org.jeecg.modules.okki.wechatinnergroup.mapper.OkkiShopWechatInnerGroupMsgMapper;
 import org.jeecg.modules.okki.wechatinnergroup.service.IOkkiShopWechatInnerGroupMsgService;
+import org.jeecg.modules.okki.wechatinnergroup.service.WechatInnerGroupMongodbRepository;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.mongodb.core.aggregation.AggregationResults;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
@@ -36,6 +44,12 @@ public class OkkiShopWechatInnerGroupMsgServiceImpl extends ServiceImpl<OkkiShop
     @Resource
     private OkkiShopWechatInnerGroupMsgMapper okkiShopWechatInnerGroupMsgMapper;
 
+    @Resource
+    private WechatInnerGroupMongodbRepository wechatInnerGroupMongodbRepository;
+
+    @Resource
+    private IOkkiSiteService okkiSiteService;
+
     /**
      * 获取站点信息
      *
@@ -43,7 +57,7 @@ public class OkkiShopWechatInnerGroupMsgServiceImpl extends ServiceImpl<OkkiShop
      * @return
      */
     @Override
-    public void getSiteWechatInnerMsgMaxSeq(String siteId) {
+    public void getSiteWechatInnerMsgMaxSeqByMysql(String siteId) {
         List<OkkiShopWechatInnerGroupMsg> shopWechatInnerGroupMsgs = okkiShopWechatInnerGroupMsgMapper.getSiteWechatInnerMsgList(siteId);
         log.info("一共有:{}个站点,需要【企业微信-内部群】消息", shopWechatInnerGroupMsgs.size());
         for (int i = 0; i < shopWechatInnerGroupMsgs.size(); i++) {
@@ -52,7 +66,35 @@ public class OkkiShopWechatInnerGroupMsgServiceImpl extends ServiceImpl<OkkiShop
                 OkkiShopWechatInnerGroupMsg okkiShopWechatInnerGroupMsg = shopWechatInnerGroupMsgs.get(finalI);
                 int maxSeq = okkiShopWechatInnerGroupMsg.getSeq() == 0 ? 1 : okkiShopWechatInnerGroupMsg.getSeq();
                 log.info("开始处理第{}个,站点ID为:{},seq为:{},【企业微信-内部群】消息", finalI + 1, okkiShopWechatInnerGroupMsg.getSiteId(), maxSeq);
-                saveWechatInnerGroupMsg(String.valueOf(okkiShopWechatInnerGroupMsg.getSiteId()), maxSeq);
+                saveWechatInnerGroupMsg(String.valueOf(okkiShopWechatInnerGroupMsg.getSiteId()), maxSeq,"mysql");
+            });
+        }
+    }
+
+    @Override
+    public void getSiteWechatInnerMsgMaxSeqByMongodb(String siteId) {
+        QueryWrapper<OkkiSite> queryWrapper = new QueryWrapper<>();
+        if(StringUtils.isNotBlank(siteId)){
+            queryWrapper.eq("site_id",siteId);
+        }
+        queryWrapper.eq("del_flag",0);
+        List<OkkiSite> okkiSites = okkiSiteService.list(queryWrapper);
+
+        log.info("一共有:{}个站点,需要【企业微信-内部群】消息", okkiSites.size());
+        for (int i = 0; i < okkiSites.size(); i++) {
+            int finalI = i;
+            ThreadPoolUtil.execute(() -> {
+                AggregationResults<GroupMsgResult> aggregationResults = wechatInnerGroupMongodbRepository.findMaxSeqBySiteId(Integer.valueOf(okkiSites.get(finalI).getSiteId()));
+                log.info("aggregationResults:{}",FastJsonUtil.toJSONString(aggregationResults));
+
+                long maxSeq = 1;
+                if(!CollectionUtil.isEmpty(aggregationResults)){
+                    GroupMsgResult groupMsgResult = aggregationResults.getMappedResults().get(0);
+                    maxSeq = groupMsgResult.getMaxSeq();
+                }
+
+                log.info("开始处理第{}个,站点ID为:{},seq为:{},【企业微信-内部群】消息", finalI + 1, okkiSites.get(finalI).getSiteId(), maxSeq);
+                saveWechatInnerGroupMsg(okkiSites.get(finalI).getSiteId(), Math.toIntExact(maxSeq),"mongodb");
             });
         }
     }
@@ -63,8 +105,9 @@ public class OkkiShopWechatInnerGroupMsgServiceImpl extends ServiceImpl<OkkiShop
      * @param siteId
      * @param minSeq
      */
-    public void saveWechatInnerGroupMsg(String siteId, Integer minSeq) {
+    public void saveWechatInnerGroupMsg(String siteId, Integer minSeq,String type) {
         List<OkkiShopWechatInnerGroupMsg> wechatInnerGroupMsgList = sendWechatInnerGroupMsgRequest(siteId, minSeq);
+        log.info("获取站点ID为:{},seq为:{},【企业微信-内部群】消息 条数一共有:{}", siteId, minSeq, wechatInnerGroupMsgList.size());
         List<OkkiShopWechatInnerGroupMsg> shopWechatInnerGroupMsgs = new ArrayList<>();
         if (!CollectionUtils.isEmpty(wechatInnerGroupMsgList)) {
             wechatInnerGroupMsgList.forEach(c -> {
@@ -84,11 +127,16 @@ public class OkkiShopWechatInnerGroupMsgServiceImpl extends ServiceImpl<OkkiShop
                 shopWechatInnerGroupMsg.setUpdateTime(c.getUpdateTime());
                 shopWechatInnerGroupMsgs.add(shopWechatInnerGroupMsg);
             });
-            this.saveBatch(shopWechatInnerGroupMsgs);
+
+            if("mongodb".equals(type)){
+                wechatInnerGroupMongodbRepository.saveAll(shopWechatInnerGroupMsgs);
+            }else{
+                this.saveBatch(shopWechatInnerGroupMsgs);
+            }
 
             //获取下一页数据
             OkkiShopWechatInnerGroupMsg lastWechatInnerGroupMsg = shopWechatInnerGroupMsgs.get(shopWechatInnerGroupMsgs.size() - 1);
-            saveWechatInnerGroupMsg(siteId, lastWechatInnerGroupMsg.getSeq());
+            saveWechatInnerGroupMsg(siteId, lastWechatInnerGroupMsg.getSeq(),type);
         }
     }
 

+ 4 - 0
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/application-prod.yml

@@ -168,6 +168,10 @@ spring:
           #username: root
           #password: root
           #driver-class-name: com.mysql.cj.jdbc.Driver
+  data:
+    mongodb:
+      uri: mongodb://ok_mongo:DPTz06GIsMS59XYu@100.21.172.224:27017/ok_chat_group
+      authentication-database: admin
   #redis 配置
   redis:
     database: 0