企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # ThreadPoolExecutor 无论创建那种线程池 必须要调用ThreadPoolExecutor 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ~~~ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ~~~ ~~~ corePoolSize: 线程池维护线程的最少数量,核心线程池的大小,在线程池被创建之后,其实里面是没有线程的。(当然,调用prestartAllCoreThreads()或者prestartCoreThread()方法会预创建线程,而不用等着任务的到来)。当有任务进来的时候,才会创建线程。当线程池中的线程数量达到corePoolSize之后,就把任务放到 缓存队列当中。(就是 workQueue) maximumPoolSize:线程池维护线程的最大数量 ,它标志着这个线程池的最大线程数量。如果没有最大数量,当创建的线程数量达到了 某个极限值,到最后内存肯定就爆掉了 keepAliveTime: 线程池维护线程所允许的空闲时间,超过这个时间就被终止了。默认情况下,只有 线程池中线程数量 大于 corePoolSize时,keepAliveTime值才会起作用。也就说说,只有在线程池线程数量超出corePoolSize了。我们才会把超时的空闲线程给停止掉。否则就保持线程池中有 corePoolSize 个线程就可以了。 unit: 线程池维护线程所允许的空闲时间的单位,参数keepAliveTime的时间单位,就是 TimeUnit类当中的几个属性 workQueue: 线程池所使用的缓冲队列,用来存储待执行任务的队列,不同的线程池它的队列实现方式不同(因为这关系到排队策略的问题)比如有以下几种 ArrayBlockingQueue:基于数组的队列,创建时需要指定大小。 LinkedBlockingQueue:基于链表的队列,如果没有指定大小,则默认值是 Integer.MAX_VALUE。(newFixedThreadPool和newSingleThreadExecutor使用的就是这种队列)。 SynchronousQueue:这种队列比较特殊,因为不排队就直接创建新线程把任务提交了。(newCachedThreadPool使用的就是这种队列)。 threadFactory:线程工厂,用来创建线程 handler: 线程池对拒绝任务的处理策略,绝执行任务时的策略,一般来讲有以下四种策略, (1) ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。 (2) ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用 execute方法的线程执行该任务。 (3) ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。 (4) ThreadPoolExecutor.DiscardPolicy,丢弃任务,不过也不抛出异常。 ~~~ 一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。 当一个任务通过execute(Runnable)方法欲添加到线程池时: * 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 * 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 * 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 * 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 也就是:处理任务的优先级为: 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 unit(线程池维护线程所允许的空闲时间的单位)可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue handler有四个选择: ~~~ ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常 ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务 ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务 ~~~ 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。 # 方法 ~~~ shutdown(),平滑的关闭线程池.(如果还有未执行完的任务,就等待它们执行完) shutdownNow(),简单粗暴的关闭线程池.(没有执行完的任务也直接关闭) setCorePoolSize(),设置/更改核心池的大小. setMaximumPoolSize(),设置/更改线程池中最大线程的数量限制. getActiveCount() 活跃的线程数 getCorePoolSize() 核心的线程数 getPoolSize() 线程池的大小 getQueue().size() 队列大小 ~~~ # execute 和 submit execute():Executor中声明的方法,向线程池提交一个任务,交由线程池去执行,没有返回值 submit() :ExecutorService中声明的方法。向线程池提交一个任务,交由线程池去执行,可以接受 ( Callable.call() )回调函数的返回值,适用于需要处理返回着或者异常的业务场景,实际上还是调用的execute()方法,只不过它利用了 Future 来获取任务执行结果 # schedule schedule: 延时执行任务 scheduleAtFixedRate : 以固定频率来执行一个任务,按照上一次任务的发起时间计算下一次任务的开始时间 scheduleWithFixedDelay : 以上一次任务的结束时间计算下一次任务的开始时间,在你不能预测调度任务的执行时长时是很有用 # 线程池的关闭 shutdown() :不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务 shutdownNow() :立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务 # 任务队列 BlockingQueue BlockingQueue workQueue 取值 ArrayBlockingQueue :有界的数组队列,先进先出,此队列创建时必须指定大小 SynchronousQueue : 同步的阻塞队列,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务 LinkedBlockingQueue :基于链表的先进先出队列,可支持有界/无界的队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE PriorityBlockingQueue :优先队列,可以针对任务排序 # 自定义线程池 ~~~ package com.study; import java.util.Locale; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ExecutorServiceHelper { /** * 获取活跃的cpu数量 */ private static int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors(); private final static BlockingQueue<Runnable> queue; private final static long KEEP_ALIVE_TIME = 3L; private final static TimeUnit KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS; private static ThreadFactory mThreadFactory; static { /** * 基于链表的队列,如果没有指定大小,则默认值是 Integer.MAX_VALUE */ queue = new LinkedBlockingQueue<Runnable>(); //默认的工厂方法将新创建的线程命名为:pool-[虚拟机中线程池编号]-thread-[线程编号] //mThreadFactory= Executors.defaultThreadFactory(); mThreadFactory = new NamedThreadFactory(); //System.out.println("NUMBER_OF_CORES:"+NUMBER_OF_CORES); } public static void execute(Runnable runnable) { if (runnable == null) { return; } /** * 1.当线程池小于 corePoolSize 时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 * 2.当线程池达到 corePoolSize 时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行 * 3.当 workQueue 已满,且 maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务 * 4.当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理 * 5.当线程池中超过 corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程 * 6.当设置 allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭 **/ /** maximumPoolSize 推荐取值 如果是 CPU 密集型任务,就需要尽量压榨CPU,参考值可以设为 NUMBER_OF_CORES + 1 或 NUMBER_OF_CORES + 2 如果是 IO 密集型任务,参考值可以设置为 NUMBER_OF_CORES * 2 */ ExecutorService executorService = new ThreadPoolExecutor(NUMBER_OF_CORES, NUMBER_OF_CORES * 2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT, queue, mThreadFactory); executorService.execute(runnable); } private static class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumberAtomicInteger = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, String.format(Locale.CHINA, "%s%d", "NamedThreadFactory", threadNumberAtomicInteger.getAndIncrement())); /* thread.setDaemon(true);//是否是守护线程 thread.setPriority(Thread.NORM_PRIORITY);//设置优先级 1~10 有3个常量 默认 Thread.MIN_PRIORITY*/ return thread; } } } ~~~ 测试下 ~~~ class TestThread { public static void main(String[] args) { ExecutorServiceHelper.execute(new Runnable() { @Override public void run() { //do something System.out.println("1"); } }); } } ~~~