ThreadPoolExecutor源码分析
前言
ThreadPoolExecutor
是一个线程池的实现.Java
提供了Executors
工厂类来创建ExecutorService
线程池实例。
构造函数
ThreadPoolExecutor
的构造方法如下
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
把线程池想象一个水壶, corePoolSize
就相当于液面警戒线, 虽然满了还可以再加水, 但是不可能超过水壶的极限高度maximumPoolSize
.keepAliveTime
则是允许线程空闲下来的时间, TimeUnit
是时间单位.workQueue
是阻塞队列, 用于存储超过corePoolSize
但是未满maximumPoolSize
的线程.threadFactory
用于创建线程池内的线程, 默认是DefaultThreadFactory
handler
是拒绝策略, 用于回调执行添加线程失败的代码.
线程池内部状态流转
Java
中的int
类型有32
位, ThreadPoolExecutor
使用ctl
的高3
位表示线程池的运行状态, 低29
位表示线程池中的线程数.
线程池的状态有:
RUNNING
: 运行状态, 可以接收任务SHUTDOWN
: 调用了shutdown()
方法后的状态, 等待所有任务执行完毕后关闭线程池.STOP
: 调用了shutdownNow()
方法后的状态, 强制结束所有任务并关闭线程池.TIDYING
: 空闲状态, 所有任务都执行完毕。TERMINATED
: 终止状态,调用了terminated()
方法后的状态.下面1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 11111111111111111111111111111, 29位全1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
// Packing and unpacking ctl
// 获取运行状态, 高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量, 低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取 ctl 变量, 组合 高3位 和 低29位
private static int ctlOf(int rs, int wc) { return rs | wc; }
}3
个静态方法用于拆分和组合运行状态和线程数量.
假设线程池运行状态为RUNING
, 线程池内有2
个线程.
那么ctl
变量的值的二进制形式就是111 00000000000000000000000000010
.
根据ctl
获取运行状态:rs = runStateOf(ctl)
得到111 00000000000000000000000000000
根据ctl
获取线程数量:wc = workerCountOf(ctl)
得到000 00000000000000000000000000010
如果之后需要根据rs
和wc
获取ctl
, 则调用ctl = ctlOf(rs, wc)
得到111 00000000000000000000000000010
添加一个Runnable
execute(Runnable command)
ThreadPoolExecutor
实现了Executor
接口的唯一一个方法void execute(Runnable command)
;
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
- 只要
Core Pool
没有填满, 线程池就会一直创建线程到Core Pool
中. - 一旦
Core Pool
填满了, 就添加到阻塞队列(构造函数传入)中.
2.2. 如果添加到阻塞队列成功, 且当前线程池的线程数为0, 则创建一个非Core
线程.- 如果添加到阻塞队列失败, 尝试创建一个非
Core
线程, 仍然失败则调用reject
处理器(构造函数传入)的回调函数.
- 如果添加到阻塞队列失败, 尝试创建一个非
简单点说, 线程池会先填满corePoolSize
, 再填满队列, 再填满maximumPoolSize
, 如果还有则调用reject
回调方法.
addWorker(Runnable firstTask, boolean core)
上面的代码一直围绕着addWorker
方法, 这个方法可以创建Core
线程和非Core
线程.
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
从上面代码可以看出, Core
线程和非Core
线程本质都是一个Worker
, 甚至这个Worker
都没有属性来标识是否为Core
线程, 而是通过一堆线程池状态来判断创建的是Core
线程还是非Core
线程.
创建完后就执行这个Worker
线程, 从阻塞队列里不停的取任务来执行.
Worker
Worker
是ThreadPoolExecutor
的内部类.
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
runWorker(Worker w)
间接调用了ThreadPoolExecutor
的runWorker
方法.
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
直接调用了run
方法, 完成线程任务.
getTask()
getTask()
从阻塞队列中获取提交到线程池的任务.
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
poll
方法支持延迟从队列中获取元素, take
则马上从队列中获取元素.
当超时后, getTask()
返回null
, 则runWorker
方法的无限循环也跑不下去了, 自然就结束了这个线程.
一个交给线程池的线程就执行完毕了. 省略了很多状态转换的代码, 如果看不懂可以结合源码阅读。
关闭线程池
关闭线程池有两种方法shutdown()
和shutdownNow()
.
参考资料提到了一种优雅的关闭线程池的方法, 如下:
1 | long start = System.currentTimeMillis(); |