Bladeren bron

质控定时任务

feng 6 maanden geleden
bovenliggende
commit
97e63aa851
1 gewijzigde bestanden met toevoegingen van 93 en 0 verwijderingen
  1. 93 0
      ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/executor/QcRuleExecutor.java

+ 93 - 0
ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/executor/QcRuleExecutor.java

@@ -0,0 +1,93 @@
+package org.ruoyi.chat.executor;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QcRuleExecutor {
+
+    /**
+     * 固定最大线程数 16,可根据需要动态调整
+     * */
+    private static final int MAX_THREADS = 16;
+
+    /**
+     * 线程池全局复用(避免频繁创建销毁)
+     * */
+    private static final ExecutorService executor =
+            new ThreadPoolExecutor(
+                    MAX_THREADS,               // corePoolSize
+                    MAX_THREADS,               // maximumPoolSize
+                    60L, TimeUnit.SECONDS,
+                    // 最大排队任务数
+                    new LinkedBlockingQueue<>(200),
+                    new NamedThreadFactory("qc-rule-thread"),
+                    // 队列满则由调用线程执行
+                    new ThreadPoolExecutor.CallerRunsPolicy()
+            );
+
+    /** 工具方法:并行执行规则质控 */
+    public static <T> List<T> executeRules(List<Callable<T>> ruleTasks) {
+        List<T> results = new ArrayList<>();
+        try {
+            List<Future<T>> futures = executor.invokeAll(ruleTasks);
+            for (Future<T> f : futures) {
+                try {
+                    // 单任务超时 10分钟
+                    results.add(f.get(10, TimeUnit.MINUTES));
+                } catch (Exception e) {
+                    results.add(null);
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        return results;
+    }
+
+    /**
+     * 工具方法:关闭线程池(一般项目中不手动关)
+     * */
+    public static void shutdown() {
+        executor.shutdown();
+    }
+
+    /**
+     * 自定义线程命名
+     * */
+    static class NamedThreadFactory implements ThreadFactory {
+        private final String prefix;
+        private final AtomicInteger idx = new AtomicInteger(1);
+        public NamedThreadFactory(String prefix) { this.prefix = prefix; }
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, prefix + "-" + idx.getAndIncrement());
+            t.setDaemon(true);
+            return t;
+        }
+    }
+
+    /** =================== 示例 =================== */
+    public static void main(String[] args) {
+        // 模拟 30 条规则,每条规则单独执行
+        List<Callable<String>> ruleTasks = new ArrayList<>();
+        for (int i = 1; i <= 30; i++) {
+            int ruleIndex = i;
+            ruleTasks.add(() -> runQcForRule(ruleIndex));
+        }
+
+        List<String> results = executeRules(ruleTasks);
+        results.forEach(System.out::println);
+
+        shutdown();
+    }
+
+    /**
+     * 模拟质控执行逻辑(替换为实际模型调用)
+     * */
+    private static String runQcForRule(int ruleIndex) throws InterruptedException {
+        Thread.sleep(1000 + new Random().nextInt(500));
+        return "规则 " + ruleIndex + " → 线程:" + Thread.currentThread().getName();
+    }
+}
+