CountDownLatch 倒数门栓
2025/12/20大约 5 分钟约 1528 字
基于 JDK 1.8 源码的 CountDownLatch 核心原理分析。
一、核心定位与设计初衷
1.1 定义
CountDownLatch 是基于 AQS 共享模式(Shared Mode)实现的同步辅助类,用于协调多个线程之间的执行顺序。核心语义:一个或多个线程阻塞等待,直到其他线程完成一系列操作后释放。
1.2 两种典型场景
| 场景 | 描述 | 模型 |
|---|---|---|
| 多等一 | 多个工作线程等待一个信号,如压测时统一发令 | 主线程 countDown,工作线程 await |
| 一等多 | 一个线程等待多个任务完成,如主线程等待资源初始化 | 工作线程 countDown,主线程 await |
// 一等多:主线程等待 N 个任务完成
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
doTask();
latch.countDown(); // 任务完成,计数 -1
}).start();
}
latch.await(); // 主线程阻塞,直到 count = 0
System.out.println("所有任务完成");二、AQS 架构映射
2.1 State 的语义
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // state 初始化为 count
}
int getCount() {
return getState();
}
}| state 值 | 含义 |
|---|---|
| N (初始值) | 需要等待 N 个 countDown 操作 |
| > 0 | 仍有未完成的操作,await 线程应阻塞 |
| = 0 | 所有操作完成,await 线程可被唤醒 |
2.2 Node 模式
所有调用 await() 的线程入队时,节点模式均为 Node.SHARED。
// AQS.acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // 入队等待
}
// 入队时指定 SHARED 模式
private void doAcquireSharedInterruptibly(int arg) {
final Node node = addWaiter(Node.SHARED); // 关键:SHARED 模式
// ...
}Node.SHARED 的意义:当一个节点被唤醒后,会检查后继是否也是 SHARED 模式,是则继续唤醒,形成链式传播。
三、源码深度解析:await() 的阻塞逻辑
3.1 await() 入口
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}3.2 tryAcquireShared() 源码
protected int tryAcquireShared(int acquires) {
// state == 0 表示门栓已打开,可以通过
// state > 0 表示门栓未打开,需要阻塞
return (getState() == 0) ? 1 : -1;
}| 返回值 | 条件 | 含义 |
|---|---|---|
| 1 | state == 0 | 获取成功,线程无需阻塞 |
| -1 | state > 0 | 获取失败,线程需进入 CLH 队列阻塞 |
核心逻辑:
await()并不尝试修改 state- 只是检查 state 是否已归零
- 归零则通过,否则阻塞
3.3 失败后的入队流程
tryAcquireShared() 返回 -1
│
▼
doAcquireSharedInterruptibly()
│
├── addWaiter(Node.SHARED) ← 创建 SHARED 节点入队
│
└── for (;;) 自旋
│
├── 前驱是 head?→ 再次 tryAcquireShared
│ │
│ └── 成功 → setHeadAndPropagate() → 传播唤醒
│
└── 失败 → park 阻塞四、源码深度解析:countDown() 与传播唤醒
4.1 countDown() 入口
public void countDown() {
sync.releaseShared(1);
}4.2 tryReleaseShared() 源码
protected boolean tryReleaseShared(int releases) {
// CAS 自旋将 state 减 1
for (;;) {
int c = getState();
if (c == 0)
return false; // 已经是 0,无需操作
int nextc = c - 1;
if (compareAndSetState(c, nextc))
// 关键:只有 state 从非零变为零时,才返回 true
return nextc == 0;
}
}关键点:
- 每次 countDown 只将 state 减 1
- 只有将 state 减为 0 的那次调用才返回
true - 返回
true后触发doReleaseShared()唤醒等待线程
4.3 传播唤醒机制(Chain Reaction)
当 tryReleaseShared 返回 true 时,AQS 执行 doReleaseShared():
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 将 head 状态置 0,防止重复唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒 head 的后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// head 没变则退出,否则继续传播
if (h == head)
break;
}
}4.4 setHeadAndPropagate() 源码
被唤醒的节点获取成功后执行:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 将自己设为新 head
// 传播条件:propagate > 0 或 head 状态为 PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 后继是共享节点,继续唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}4.5 链式传播流程图
countDown() 使 state 从 1 变为 0
│
▼
tryReleaseShared() 返回 true
│
▼
doReleaseShared()
│
▼
唤醒 head.next (Node A)
│
▼
Node A 被 unpark,从 park 处醒来
│
▼
Node A 执行 tryAcquireShared() → 返回 1 (成功)
│
▼
Node A 执行 setHeadAndPropagate()
├── 将自己设为新 head
│
└── 检查后继是否 SHARED
│
└── 是 → 调用 doReleaseShared() → 唤醒 Node B
│
└── Node B 重复上述流程 → 唤醒 Node C → ...这就是为什么一个线程 countDown 能唤醒所有等待线程:
- countDown 将 state 减为 0
- 触发 doReleaseShared 唤醒队头
- 被唤醒的节点发现后继也是 SHARED,继续唤醒
- 链式反应,直到所有 SHARED 节点被唤醒
五、面试高频对比:CountDownLatch vs CyclicBarrier
| 维度 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 底层实现 | AQS 共享模式 | ReentrantLock + Condition |
| 阻塞队列 | CLH 队列 | Condition 等待队列 |
| 计数方式 | 减法(countDown 减 1) | 加法(await 时 +1) |
| 是否可重用 | 不可重用,计数归零后无法重置 | 可重用,一代结束后自动重置 |
| 谁来计数 | 任意线程 countDown | 参与线程 await |
| 到达动作 | 无 | 可指定 barrierAction |
| 适用场景 | 等待事件完成 | 线程汇合点 |
本质区别:
- CountDownLatch:事件驱动,N 个事件完成后释放等待
- CyclicBarrier:线程驱动,N 个线程到达后一起继续
六、生产环境避坑
6.1 await 死锁风险
// 危险:如果某个任务异常退出,没有调用 countDown
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
doTask(); // 如果这里抛异常
} finally {
latch.countDown(); // 必须在 finally 中
}
}).start();
}
// 建议:使用超时机制
boolean success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
// 超时处理
}6.2 计数器不可重置
CountDownLatch latch = new CountDownLatch(1);
latch.countDown(); // state = 0
// 无法重置!以下 await 会立即返回
latch.await(); // 不会阻塞需要重用时,应选择 CyclicBarrier 或手动创建新实例。
6.3 最佳实践
CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
executor.submit(() -> {
try {
doTask();
} catch (Exception e) {
log.error("Task failed", e);
} finally {
latch.countDown(); // 确保一定执行
}
});
}
// 带超时的等待
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new TimeoutException("Tasks not completed in time");
}七、总结
| 知识点 | 要点 |
|---|---|
| AQS 模式 | 共享模式(Node.SHARED) |
| State 语义 | 计数器,初始化为 N |
| await | state > 0 阻塞,state = 0 通过 |
| countDown | CAS 减 1,归零时触发唤醒 |
| 传播机制 | SHARED 节点链式唤醒 |
| 不可重用 | 计数归零后无法重置 |
| 避坑 | finally 中 countDown,await 带超时 |
