ThreadPool
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
1 | /** |
通过Executors
工具类可以创建多种类型的线程池,包括:
FixedThreadPool
:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。核心线程数为n,最大线程数为n,任务队列长度为Interger.MAX_VALUE的LinkedBlockingQueueSingleThreadExecutor
: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。核心线程数、最大线程数都为1,任务队列长度为Interger.MAX_VALUE的LinkedBlockingQueueCachedThreadPool
: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。核心线程数为0,最大线程数为Interger.MAX_VAULE,任务队列为无容量的SynchronousQueueScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池。核心线程为n,最大线程为Interger.MAX_VALUE,任务队列长度为最大Interger_MAX_VALUE的DelayQueue
《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors
去创建,而是通过 ThreadPoolExecutor
构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors
返回线程池对象的弊端如下:
FixedThreadPool
和SingleThreadExecutor
:使用的是无界的LinkedBlockingQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
:使用的是同步队列SynchronousQueue
, 允许创建的线程数量为Integer.MAX_VALUE
,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。ScheduledThreadPool
和SingleThreadScheduledExecutor
:使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
ThreadPoolExecutor使用详解
其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。
# Execute原理
https://excalidraw.com/#json=Y7a8uMRrIrZNGfr7GnQ0M,2qUB-ZmXM-hwP0a3MWnA1A
当一个任务提交至线程池之后:
- 线程池首先当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2.
- 判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入3.
- 如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。
运行机制(当任务来了之后的执行流程):
- 判断核心线程数是否已满;如果未满创建核心线程执行任务;如果满了执行后续操作。
- 判断任务队列是否已满;如果未满将任务添加到队列;如果满了执行后续流程。
- 判断最大线程数是否已满;如果未满创建临时线程执行任务;如果满了执行后续流程。
- 执行拒绝策略(内置4种拒绝策略+自定义的拒绝策略)。
当ThreadPoolExecutor创建新线程时,通过CAS来更新线程池的状态ctl.
# 参数
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。workQueue
用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列: 具体可以参考JUC 集合: BlockQueue详解ArrayBlockingQueue
: 基于数组结构的有界阻塞队列,按FIFO排序任务;LinkedBlockingQueue
: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue
: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue
: 具有优先级的无界阻塞队列;
LinkedBlockingQueue
比ArrayBlockingQueue
在插入删除节点性能方面更优,但是二者在put()
, take()
任务的时均需要加锁,SynchronousQueue
使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer()
.
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue.keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止;unit
keepAliveTime的单位threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactoryhandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:AbortPolicy
: 直接抛出异常,默认策略;CallerRunsPolicy
: 用调用者所在的线程来执行任务;DiscardOldestPolicy
: 丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy
: 直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
※线程池ThreadPoolExecutor
常见的线程池有:
无缓存线程
· 定长线程池(最常见,如Glide)
FixedThreadPool:根据入参决定有多少个核心线程,无缓存线程。 可重用固定线程数的线程池。(适用于负载比较重的服务器) FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列 中的任务。
· 单线程线程池
SingleThreadExecutor:只有一个核心线程,最大线程也为1,无缓存线程。所有任务在此线程中FIFO进行只会创建一个线程执行任务。(适用于需要保证顺序执行各个任 务;并且在任意时间点,没有多线程活动的场景。) SingleThreadExecutorl也使用无界队列LinkedBlockingQueue作为工作队列 若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先 出的顺序执行队列中的任务。
· 定时线程池
ScheduledThreadPool:只有入参数量的核心线程,无缓存线程。继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行 任务,或者定期执行任务。使用DelayQueue作为任务队列。如newScheduledThreadPool,用于定时任务。
无核心线程
· 缓存线程池
CachedThreadPool:无核心线程,无限制地增加执行完成就销毁(根据keepaliveTime决定)的缓存线程,是一个会根据需要调整线程数量的线程池。(大小无界,适用于执行很 多的短期异步任务的小程序,或负载较轻的服务器) CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线 程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕 后,将返回线程池进行复用。
作用
- 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。一个线程保留1M大小的内存空间,有效降低OOM
- 提高相应速度:因为省去了创建线程这个步骤,所以在拿到任务时,可以立刻开始执行。
线程数应该怎么设置
- 如果任务是IO密集型,一般线程数需要设置2倍CPU数以上(2N),以此来尽量利用CPU资源。
- 如果任务是CPU密集型,一般线程数量只需要设置CPU数加1即可,更多的线程数也只能增加上下文切换,不能增加CPU利用率。
在计算密集型任务中,将线程池大小设置为 CPU 核心数 + 1 的原因是为了应对可能出现的阻塞情况。 虽然计算密集型任务主要消耗 CPU 资源,但在实际应用中,任务内部可能仍然存在一些阻塞操作, 例如:
同步 IO: 如果任务需要进行磁盘读写或网络通信等 IO 操作,并且这些操作是同步阻塞的,那么当前线程会被阻塞, 无法继续执行计算任务。
锁竞争: 如果任务中存在对共享资源的访问,并且使用了锁机制进行同步, 那么当多个线程同时竞争锁时, 部分线程会被阻塞, 等待获取锁。
页面错误: 当线程访问的内存页面不在物理内存中时,会发生页面错误, 导致线程被阻塞, 等待操作系统将页面从磁盘加载到内存。 如果线程池大小刚好等于 CPU 核心数,那么当一个线程被阻塞时,CPU 就无法充分利用,导致整体性能下降。 而增加一个额外的线程,可以确保在某个线程被阻塞时, 仍然有足够的线程可以继续执行计算任务, 从而提高 CPU 利用率和整体性能。 当然,这只是一个经验法则,并不是绝对的。 在某些情况下, 如果任务中不存在阻塞操作, 或者阻塞情况非常少见, 那么将线程池大小设置为 CPU 核心数也可能足够。 最佳的线程池大小仍然需要根据你的具体应用场景和硬件环境进行调整和测试。
简单来说就是 io多,则用更多线程充分利用cpu;计算多,则用少的线程数减少线程切换,但仍存在的io操作使数量应为n+1;
Android一般认为多数操作是IO密集,如网络io,本地文件io,所以会设置2N
Android系统对每个进程线程数限制
root 下adb shell cat /proc/sys/kernel/threads-max
结果如:57439
但每个线程1M左右,基本上几百个线程就可能OOM了
常见的三方库线程池默认数量
Okhttp
OkHttp中的线程池是定义在分发器中的,即定义在Dispatcher
1 | public synchronized ExecutorService executorService() { |
用的其实相当于就是一个无核心线程,最大线程池为Integer.MAX_VALUE,任务队列为SynchronousQueue的缓存线程池
高并发,最大吞吐量。SynchronousQueue队列是无容量队列,
在OkHttp中,配置的线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,线程的存活时间为60s,采用的队列是SynchronousQueue。
- okhttp
默认同时支持 64 个异步请求(不考虑同步请求),一个 host 同时最多请求 5 个
- okhttp 内部的
线程池都是 CacheThreadPool
:核心线程数为 0,非核心线程数无限,永远添加不到等待队列中 okhttpClient 如果不单例,会出现 oom
:因为大量的 Dispatcher 对象,不同的对象会使用不同的线程去发起网络请求,从而导致线程过多,OOM
Glide
Glide用的都是核心线程数与最大线程数一致(cpu数量与4的最小值),任务队列为PriorityBlockingQueue的定长线程池。(所以Glide的最大并发量是四个图片?)
glide加载的线程池的配置,使用cpu数量与4的最小值,即线程池的核心线程和最大线程数不超过4个
线程池关闭方法区别
shutdown() 、 shutdownNow() 、 awaitTermination() 的用法和区别
shutdown()
将线程池状态置为SHUTDOWN,并不会立即停止:
- 停止接收外部submit的任务
- 内部正在跑的任务和队列里等待的任务,会执行完
- 等到第二步完成后,才真正停止
shutdownNow()
将线程池状态置为STOP。企图立即停止,事实上不一定:
- 跟shutdown()一样,先停止接收外部提交的任务
- 忽略队列里等待的任务
- 尝试将正在跑的任务interrupt中断
返回未执行的任务列表
awaitTermination()
awaitTermination(long timeOut, TimeUnit unit)
当前线程阻塞,直到
- 等所有已提交的任务(包括正在跑的和队列中等待的)执行完
- 或者等超时时间到
- 或者线程被中断,抛出InterruptedException
然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)
小结:
- 优雅的关闭,用shutdown(),停止接受任务并等待进行中的和队列中的任务都执行完后停止。
- 想立马中断并关闭,并得到未执行任务列表,用shutdownNow(),会interrupt正在进行的任务并忽略队列中任务,返回未执行的任务队列
- 优雅的关闭,并允许关闭声明后新任务能提交,用awaitTermination()
线程池都有哪几种工作队列?
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量 通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()、newSingleThreadExecutor使用了 这个队列。
SynchronousQueue:是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
DelayedWorkQueue:是一个阻塞队列。保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。newScheduledThreadPool使用了这个队列。
假设向线程池提交任务时,核心线程都被占用的情况下:
ArrayBlockingQueue
:基于数组的阻塞队列,初始化需要指定固定大小。
当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了之后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。所以最终可能出现后提交的任务先执行,而先提交的任务一直在等待。
LinkedBlockingQueue
:基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定。
当指定大小后,行为就和ArrayBlockingQueu
一致。而如果未指定大小,则会使用默认的Integer.MAX_VALUE
作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功。最终意味着所有任务都是在核心线程执行。如果核心线程一直被占,那就一直等待。
SynchronousQueue
: 无容量的队列。
使用此队列意味着希望获得最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败。而失败后如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Integer.MAX_VALUE
就实现了真正的无等待。
//lqr:TODO https://juejin.cn/post/6847902225730109454
拒绝策略rejectHander
当 Executor 已关闭时,以及当 Executor 对最大线程和工作队列容量使用有限界限且已饱和时,在方法execute(Runnable)中提交的新任务将被拒绝。在任一情况下, execute方法都会调用其RejectedExecutionHandler
的RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
方法。提供了四种预定义的处理程序策略:
在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在被拒绝时会抛出运行时
RejectedExecutionException
。1
2
3throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());在ThreadPoolExecutor.CallerRunsPolicy中,调用execute的线程本身会运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
1
2
3if (!e.isShutdown()) {
r.run();
}在ThreadPoolExecutor.DiscardPolicy中,无法执行的任务将被直接丢弃。
1
//do nothing
在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器未关闭,则工作队列头部的任务将被删除,然后重试执行(这可能会再次失败,导致重复此操作。)
1
2
3
4if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
如果提交任务时,线程池队列已满,会发生什么
如果使用的LinkedBlockingQueue,也就是无界队列的话,继续添加任务到阻塞队列中等待执行,因为LinkedBlockingQueue可以无限存放任务;如果使用的是有界队列比方说ArrayBlockingQueue的话,则会使用拒绝策略RejectedExecutionHandler处理满了的任务。