💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
## 一、概述 比如轮询大量的记录,这种job,可能会一次轮询耗时数个小时到数天,那么,对于业务来说,是无法接受的;这个时候,平台支持通过引入派生多线程的并发解决方案; ## 二、解决方案 通过动态派生多线程来实现;平台中,通过提供RayCaller和RayThread来实现(实际上,除了调度服务,所有需要动态多线程的场景,这两个平台提供的基础设施类,都能提供帮助); 下面通过一个例子来说明; ``` @Component public class MultiThreadJobSample extends RayPeriodScheduleJob { @Override public boolean enabled() { return true; } @Override public String jobDesc() { return "多线程验证"; } @Override public long periodInSeconds() { return 1 * minute; } @Override public long initialDelayInSeconds() { return 1 * minute; } @Override public String allowedIps() { return all; } @Autowired private RunCaller runCaller; @Override public void doInRayPeriodJob(SysJobExecution sysjobexecution) { int totalSize=10000; int sizePerThread=1000; List<Long> idList = new ArrayList(); for (long i = 0; i < totalSize; i++) { idList.add(i); } // 将待轮询的大列表,依据条目进行分组,每个分组规划一个线程执行 List<List<Long>> threadSplitList = ListUtil.splitListByPerListSize(idList, sizePerThread); int threadNum = threadSplitList.size(); runCaller.call(logger, threadNum, threadSplitList, sysjobexecution); } @Component public static class RunCaller extends RayCaller { @Override public void createThreadToExecutionTask(Logger logger, int threadNum, ExecutorService executor, CountDownLatch currentMainThreadLatch) { // 依据分配好的线程轮询对象,每个线程分配均匀的条目轮询对象列表 for (int i = 0; i < threadNum; i++) { List<List<Long>> threadSplitList = getMainTranferdBusinessObject(0); List<Long> listPer = threadSplitList.get(i); Runnable task = new RunThread(logger, i+ 1, currentMainThreadLatch, appendToEndOfMainTranferdAsCallerTransferedBusinessObjects(listPer)); executor.execute(task); } } } public static class RunThread extends RayThread { public RunThread(Logger logger, int threadIndex, CountDownLatch callerCountDownLatch, Object[] callerTranferdBusinessObjects) { super(logger, threadIndex, callerCountDownLatch, callerTranferdBusinessObjects); } @Override public void doBusinessInThread(Object[] callerTranferdBusinessObjects) { logger.error("{}开始运行", Thread.currentThread().getName()); SysJobExecution sysjobexecution = getCallerTranferdBusinessObject(1); List<Long> threadSplitList = getCallerTranferdBusinessObject(2); StringBuilder errorMessage = getCallerTranferdBusinessObject(3); // 针对当前线程需要需要处理的轮询对象,进行轮询 for (int i = 0; i < threadSplitList.size(); i++) { try { //可能抛异常的代码 } catch(Exception ex) { errorMessage.append(ExceptionUtil.getFullStackTraceAsString(e)); } logger.error("处理了第{}条", threadSplitList.get(i)); } } } } ``` >[danger] RunThread 中的doBusinessInThread,才是真正执行业务的地方;