线程池
基于 JDK 1.8 源码的线程池核心原理与执行流程分析。
一、核心架构与状态管理
1.1 CTL 变量的设计
ThreadPoolExecutor 使用一个 AtomicInteger 类型的 ctl 变量,巧妙地将线程池状态和工作线程数量打包存储:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 约 5 亿
// 状态存储在高 3 位
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
// 拆包方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 取高 3 位
private static int workerCountOf(int c) { return c & CAPACITY; } // 取低 29 位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 打包设计优势:通过一个原子变量同时维护两个状态,避免了多变量同步的复杂性。
1.2 五大生命周期状态
| 状态 | 值(高3位) | 接收新任务 | 处理队列任务 | 说明 |
|---|---|---|---|---|
| RUNNING | 111 (负数) | ✓ | ✓ | 正常运行状态 |
| SHUTDOWN | 000 | ✗ | ✓ | 不接新任务,处理完队列 |
| STOP | 001 | ✗ | ✗ | 不接新任务,中断执行中任务 |
| TIDYING | 010 | ✗ | ✗ | 所有任务终止,workerCount=0 |
| TERMINATED | 011 | ✗ | ✗ | terminated() 钩子执行完毕 |
为什么 RUNNING 是负数?
RUNNING = -1 << 29 = 111_00000...(二进制最高位是 1,表示负数)这样设计是为了方便通过 ctl < 0 快速判断是否处于 RUNNING 状态:
private static boolean isRunning(int c) {
return c < SHUTDOWN; // SHUTDOWN = 0,RUNNING < 0
}状态流转图:
RUNNING ──shutdown()──> SHUTDOWN ──队列空+线程终止──> TIDYING ──> TERMINATED
│ ▲
└────shutdownNow()────> STOP ──线程终止──────────────┘1.3 七大核心参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)参数关系:
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ corePoolSize│ │ workQueue │ │
│ │ 核心线程 │ 满 │ 任务队列 │ 满 │
│ │ (常驻) │ ──────> │ (缓冲) │ ──────> │
│ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ │
│ │ max - core │ 满 │
│ │ 非核心线程 │ ──────> 拒绝策略 │
│ │(可回收) │ │
│ └─────────────┘ │
│ │ │
│ │ 空闲超过 keepAliveTime │
│ ▼ │
│ 线程销毁 │
│ │
└─────────────────────────────────────────────────────────────────┘二、Worker 的内部实现
2.1 类结构
private final class Worker
extends AbstractQueuedSynchronizer // 继承 AQS
implements Runnable // 实现 Runnable
{
final Thread thread; // Worker 持有的线程
Runnable firstTask; // 创建时携带的第一个任务
volatile long completedTasks; // 完成的任务计数
}为什么继承 AQS?
Worker 继承 AQS 实现了一个简单的不可重入独占锁,核心目的:
- 标识线程状态:通过
tryLock()判断线程是否在忙碌 - 支持安全中断:
shutdown()时只中断空闲线程
为什么实现 Runnable?
Worker 本身作为线程的执行体,thread.start() 会调用 Worker 的 run() 方法,进而调用 runWorker(this)。
2.2 锁的机制
Worker 锁 vs ReentrantLock
| 对比 | Worker 锁 | ReentrantLock |
|---|---|---|
| 可重入 | 不可重入 | 可重入 |
| state 初始值 | -1(禁止中断) | 0 |
| 目的 | 标识忙碌/空闲 | 通用互斥 |
为什么不可重入?
// Worker 的 AQS 实现
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) { // 只有 0 → 1 才成功
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false; // 已锁定,不可重入
}不可重入的目的是防止在执行任务时被 setCorePoolSize 等方法重入并错误中断:
// 假设 ReentrantLock(可重入),则这种调用链会出问题:
// task.run() → pool.setCorePoolSize() → interruptIdleWorkers() → worker.tryLock() 成功!
// 导致正在执行任务的线程被错误地认为是"空闲"而被中断Worker 锁 vs mainLock
| 锁 | 类型 | 作用 |
|---|---|---|
| Worker 锁 | 每个 Worker 独立持有 | 标识单个 Worker 是否在执行任务 |
| mainLock | 全局 ReentrantLock | 保护 workers Set、统计信息等共享资源 |
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>(); // 需要 mainLock 保护state = -1 的含义
Worker(Runnable firstTask) {
setState(-1); // 禁止中断,直到 runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}Worker 创建时 state = -1,此时 tryLock() 会失败(因为只有 state=0 才能获取锁)。
目的:防止线程在还没开始执行 runWorker() 时就被中断。只有在 runWorker() 开始后才会 unlock()(将 state 设为 0),此后才允许中断。
三、核心执行链路
3.1 execute() - 任务提交入口
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 第一步:核心线程数判断
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true = 核心线程
return;
c = ctl.get();
}
// 第二步:入队判断
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:防止入队后线程池被关闭
if (!isRunning(recheck) && remove(command))
reject(command);
// 防止没有 Worker 执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三步:最大线程数判断
else if (!addWorker(command, false)) // false = 非核心线程
reject(command); // 第四步:拒绝策略
}三步判断流程:
workerCount < corePoolSize?
│
┌────YES──┴──NO────┐
│ │
▼ ▼
addWorker(true) workQueue.offer()?
创建核心线程 │
┌───YES───┴───NO───┐
│ │
▼ ▼
入队成功 workerCount < maxPoolSize?
│
┌───YES───┴───NO───┐
│ │
▼ ▼
addWorker(false) reject()
创建非核心线程 拒绝策略3.2 addWorker() - 创建工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 状态检查:非 RUNNING 状态通常不允许添加
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 容量检查
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 增加 workerCount
if (compareAndIncrementWorkerCount(c))
break retry; // 成功跳出双层循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry; // 状态变了,重新外层循环
}
}
// CAS 成功后,创建 Worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建 Worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
workers.add(w); // 加入 workers Set
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程!
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w); // 失败回滚
}
return workerStarted;
}关键步骤:
- 双层 CAS 循环:增加 workerCount
- 创建 Worker:包装 firstTask
- 加锁添加到 Set:使用 mainLock 保护
- 启动线程:
t.start()
3.3 runWorker() - 线程复用的核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // state: -1 → 0,允许中断
boolean completedAbruptly = true;
try {
// 核心:死循环获取任务
while (task != null || (task = getTask()) != null) {
w.lock(); // 标记为忙碌
// 中断检查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法
try {
task.run(); // 执行任务!
} catch (RuntimeException x) {
throw x;
} finally {
afterExecute(task, null); // 钩子方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 标记为空闲
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出处理
}
}关键机制:
- 死循环:
while (task != null || (task = getTask()) != null) - lock/unlock:标识忙碌/空闲状态
- 钩子方法:
beforeExecute和afterExecute可重写用于监控
3.4 getTask() - 线程复用与回收的关键
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 状态检查
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; // 返回 null,线程退出
}
int wc = workerCountOf(c);
// 关键:决定是否超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 容量检查 + 超时检查
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 超时,线程退出
continue;
}
try {
// 核心:take vs poll
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
workQueue.take(); // 无限等待
if (r != null)
return r;
timedOut = true; // poll 超时返回 null
} catch (InterruptedException retry) {
timedOut = false;
}
}
}timed 变量的决定逻辑:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;| 条件 | timed | 获取方式 | 效果 |
|---|---|---|---|
wc <= corePoolSize 且 allowCoreThreadTimeOut = false | false | take() | 无限等待,核心线程不销毁 |
wc > corePoolSize | true | poll() | 超时返回 null,非核心线程销毁 |
allowCoreThreadTimeOut = true | true | poll() | 所有线程都可超时销毁 |
这就是"核心线程"和"非核心线程"的本质区别:不是线程本身有标记,而是通过当前 workerCount 与 corePoolSize 的比较动态决定。
四、关键机制与面试考点
4.1 四种拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy | 抛出 RejectedExecutionException | 默认策略,快速失败 |
| DiscardPolicy | 静默丢弃任务 | 允许丢失的场景 |
| DiscardOldestPolicy | 丢弃队列最老的任务 | 处理最新数据的场景 |
| CallerRunsPolicy | 由提交任务的线程执行 | 需要背压的场景 |
CallerRunsPolicy 的背压作用:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 提交任务的线程自己执行
}
}
}当线程池满载时,提交任务的线程(如主线程)会被阻塞来执行任务,从而降低任务提交速度,实现背压效果。
4.2 Shutdown 原理
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 状态改为 SHUTDOWN
interruptIdleWorkers(); // 中断空闲线程
onShutdown(); // 钩子方法
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 关键:tryLock 区分忙碌/空闲
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); // 中断空闲线程
} finally {
w.unlock();
}
}
}
} finally {
mainLock.unlock();
}
}核心逻辑:
w.tryLock()成功 → 线程空闲(在getTask()中等待)→ 中断w.tryLock()失败 → 线程忙碌(在执行任务)→ 不中断
4.3 为什么必须是 BlockingQueue
// 如果使用普通队列
while (true) {
Runnable task = queue.poll();
if (task != null) {
task.run();
}
// 问题:队列为空时 CPU 空转!
}
// BlockingQueue 解决方案
while (true) {
Runnable task = queue.take(); // 阻塞等待,不消耗 CPU
task.run();
}BlockingQueue 的优势:
take():队列空时阻塞,不消耗 CPUpoll(timeout):超时等待,支持线程回收
五、总结
执行链路全景图
execute(task)
│
├─ workerCount < corePoolSize
│ └─ addWorker(task, true) ─────────────────────┐
│ │
├─ workQueue.offer(task) │
│ └─ 入队成功 │
│ │
├─ workerCount < maximumPoolSize │
│ └─ addWorker(task, false) ────────────────────┤
│ │
└─ reject(task) │
│
┌─────────────────────────────────────────────────────┘
│
▼
addWorker()
├─ CAS 增加 workerCount
├─ new Worker(task)
├─ workers.add(w)
└─ thread.start()
│
▼
runWorker()
│
├─ while (task != null || (task = getTask()) != null)
│ │
│ ├─ w.lock()
│ ├─ beforeExecute()
│ ├─ task.run()
│ ├─ afterExecute()
│ └─ w.unlock()
│
└─ processWorkerExit()核心要点
| 知识点 | 要点 |
|---|---|
| CTL 设计 | 高 3 位状态 + 低 29 位线程数,原子操作 |
| RUNNING 负数 | 方便 ctl < 0 快速判断 |
| Worker 锁 | 不可重入,state=-1 禁止初始中断 |
| 核心/非核心区别 | 动态判断 wc > corePoolSize,不是固定标记 |
| shutdown | tryLock() 区分忙碌/空闲线程 |
| 背压 | CallerRunsPolicy 让提交线程执行任务 |
六、个人复述理解
6.1 核心执行流程
当任务到来时,首先查看核心线程数是否已满。如果未满,创建一个核心线程执行这个任务。这里是非公平的设计——不管任务队列是否有等待的任务,新创建的核心线程都会直接带着新任务去执行。
如果核心线程数满了,尝试投入工作队列。如果工作队列未满,直接入队排队等待消费。
如果工作队列已满,查看最大线程数是否已满。如果未满,创建一个非核心线程带着任务去执行。
如果最大线程数也满了,根据拒绝策略来处理:
- AbortPolicy:抛出异常
- DiscardPolicy:静默丢弃
- DiscardOldestPolicy:淘汰最老的任务
- CallerRunsPolicy:由提交任务的线程自己执行
6.2 核心线程与非核心线程的动态区分
对于核心线程和非核心线程来说,其实并没有明显的界限,线程本身没有"核心"的标记。
这是根据线程池当前的存活线程数量来动态决定的。在
getTask()方法的死循环中,每一次循环都会重新判断timed变量:boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- 如果
wc > corePoolSize或设置了allowCoreThreadTimeOut = true,则timed = truetimed = true时使用poll(keepAliveTime)获取任务,超时返回 null 后线程销毁timed = false时使用take()阻塞等待,线程不会销毁关键点:本次循环
timed = true(被认为是"非核心"),下一次循环可能因为有其他线程退出导致wc <= corePoolSize,此时timed = false,这个线程就变成了所谓的"核心线程"。身份是动态变化的!
6.3 Worker 结构与锁机制
Worker 继承了 AQS 同时实现了 Runnable 接口,代表它既是一个任务执行体,又拥有锁的能力。
Worker 的
run()方法中就一个函数调用:runWorker(this)。这是一个死循环,不断获取任务、执行任务。每次获取到任务去执行时,首先会
lock(),任务结束后才会unlock()。这样设计是为了能够判断线程是忙碌还是空闲,方便shutdown()时进行中断:
tryLock()成功 → 线程空闲 → 可以中断tryLock()失败 → 线程忙碌 → 不中断补充细节:
- Worker 使用不可重入锁,防止执行任务时被
setCorePoolSize()等方法重入并错误中断- Worker 初始
state = -1,此时tryLock()会失败,防止线程还没开始执行runWorker()就被中断
