Browse Source

增加springboot启动时的线程池线程数量,控制sh远程脚本的输出,防止缓冲区满载,线程卡主

chenlei1231 2 months ago
parent
commit
312c83540d

+ 63 - 25
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/adweb/common/util/ShellSSH2Util.java

@@ -1,5 +1,6 @@
 package org.jeecg.modules.adweb.common.util;
 
+import ch.ethz.ssh2.ChannelCondition;
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
 import ch.ethz.ssh2.StreamGobbler;
@@ -9,6 +10,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * 远程Shell执行工具类
@@ -262,27 +265,59 @@ public class ShellSSH2Util {
      */
     private static String execmd(Connection connection, String cmd, StdoutListener listener) {
         String result = "";
+        Session session = null;
         try {
             if (connection != null) {
-                Session session = connection.openSession();// 打开一个会话
+                session = connection.openSession();// 打开一个会话
                 session.execCommand(cmd);// 执行命令
-                result = processStdout(session.getStdout(), DEFAULT_CHART, listener);
-                listener.stdout("命令标准输出结果:\n");
-                listener.stdout(result);
-                // 如果为得到标准输出为空,说明脚本执行出错了
-                if (StringUtils.isBlank(result)) {
-                    listener.stderr("得到标准输出为空, 链接conn:" + connection + ",执行的命令:" + cmd);
-                    result = processStdout(session.getStderr(), DEFAULT_CHART, listener);
-                    listener.stderr("得到标准输出为空, 链接conn:" + connection + ",执行的命令:" + cmd + "; 错误信息:" + result);
-                } else {
-                    listener.stdout("执行命令成功, 链接conn:" + connection + ",执行的命令:" + cmd);
+
+                // 使用线程池来管理输出读取线程
+                ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+                // 读取标准输出
+                Session finalSession = session;
+                executorService.submit(() -> {
+                    String stdoutResult = processStdout(finalSession.getStdout(), DEFAULT_CHART, listener);
+                    listener.stdout("命令标准输出结果:\n");
+                    listener.stdout(stdoutResult);
+                    if (StringUtils.isBlank(stdoutResult)) {
+                        listener.stderr("得到标准输出为空, 链接conn:" + connection + ",执行的命令:" + cmd);
+
+                        listener.stderr("得到标准输出为空, 链接conn:" + connection + ",执行的命令:" + cmd + "; 错误信息:" + stdoutResult);
+                    } else {
+                        listener.stdout("执行命令成功, 链接conn:" + connection + ",执行的命令:" + cmd);
+                    }
+                });
+
+                // 读取标准错误
+                Session finalSession1 = session;
+                executorService.submit(() -> {
+                    processStdout(finalSession1.getStderr(), DEFAULT_CHART, listener);
+                });
+
+                // 等待所有输出读取线程完成
+                executorService.shutdown();
+                while (!executorService.isTerminated()) {
+                    // 可以在这里设置一个超时,防止无限等待
+                    Thread.sleep(300);
                 }
-                session.close();
-                connection.close();
+
+                // 等待脚本执行完毕
+                session.waitForCondition(ChannelCondition.CLOSED | ChannelCondition.EXIT_STATUS, 0);
             }
         } catch (IOException e) {
             listener.stderr("执行命令失败,链接conn:" + connection + ",执行的命令:" + cmd + " " + e);
             log.error("执行命令失败,链接conn:{},执行的命令:{} {}", connection, cmd, e.getMessage());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            // 关闭会话和连接
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
         }
         return result;
 
@@ -298,22 +333,25 @@ public class ShellSSH2Util {
     private static String processStdout(InputStream in, String charset, StdoutListener listener) {
         InputStream stdout = new StreamGobbler(in);
         StringBuffer buffer = new StringBuffer();
-        try {
-            BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
-            String line = null;
+
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset))) {
+            String line;
+            StringBuilder bufferBuilder = new StringBuilder();
             while ((line = br.readLine()) != null) {
-                buffer.append(line + "\n");
-                listener.stdout(line);
+                bufferBuilder.append(line).append("\n");
+                try {
+                    listener.stdout(line);
+                } catch (Exception e) {
+                    // 记录异常日志或进行其他处理
+                    log.error("Error in listener.stdout: {}", e.getMessage());
+                }
             }
-            br.close();
-        } catch (UnsupportedEncodingException e) {
-            buffer.append("解析脚本出错:" + e.getMessage() + "\n");
-            listener.stderr("解析脚本出错:" + e.getMessage());
-            e.printStackTrace();
+            buffer.append(bufferBuilder);
         } catch (IOException e) {
-            buffer.append("解析脚本出错:" + e.getMessage() + "\n");
+            // 记录异常日志或进行其他处理
+            buffer.append("解析脚本出错:").append(e.getMessage()).append("\n");
             listener.stderr("解析脚本出错:" + e.getMessage());
-            e.printStackTrace();
+            log.error("解析脚本出错:{}", e.getMessage());
         }
         return buffer.toString();
     }

+ 9 - 9
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/adweb/site/service/SelfWebSiteService.java

@@ -290,14 +290,9 @@ public class SelfWebSiteService {
             shellService.exceShell(cmd, new ShellSSH2Util.StdoutListener() {
                 @Override
                 public void stdout(String line) {
-                    log.info("标准 :" + line);
-
-                    if ("ERROR:FILE_NAME IS EXISTS".equals(line)) {
-                        // TODO ----
-                    }
-
+                    log.info("标准 :{}", line);
 
-                    if ("[Creationcompleted]".equals(line)) {
+                    if ("[Creation completed]".equals(line)) {
                         log.info("======网站上线成功======");
                         statusSuccess.append("ok");
                     }
@@ -305,7 +300,7 @@ public class SelfWebSiteService {
 
                 @Override
                 public void stderr(String line) {
-                    log.info("错误 :" + line);
+                    log.info("错误 :{}", line);
                     statusFail.append(line);
                 }
             }, adwebServer);
@@ -330,7 +325,12 @@ public class SelfWebSiteService {
                 String message = "**站点CODE:** " + siteCode + "\n";
                 message += "**临时域名:** " + adwebSite.getDomainDev() + "\n";
                 message += "**服务器IP:** " + adwebSite.getDevServerIp() + "\n";
-                sendAdWebV3FeiShuMsg(title, message);
+                sendAdWebV3FeiShuMsg(title, message); // 发送飞书消息
+
+                String[] userIds = {user.getId()};
+                // 给该站点所属的用户发送消息
+                log.info("发布站点成功通知");
+                webSocketService.sendMessage(userIds, "站点发布完成通知", "您的站点已经发布成功,请在站点管理中查看该站点详细信息", WebsocketConst.CMD_MANAGE_SITE);
 
                 // 创建站点完成-操作
                 adwebSite.setDomain(originDomain);

+ 2 - 2
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/config/AsyncConfig.java

@@ -9,8 +9,8 @@ public class AsyncConfig {
     @Bean(name = "taskExecutor")
     public ThreadPoolTaskExecutor taskExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(4);
-        executor.setMaxPoolSize(8);
+        executor.setCorePoolSize(8);
+        executor.setMaxPoolSize(16);
         executor.setQueueCapacity(500);
         executor.setThreadNamePrefix("Async-");
         executor.initialize();