CyclicBarrier 循环栅栏
2025/12/20大约 6 分钟约 1761 字
基于 JDK 1.8 源码的 CyclicBarrier 核心原理分析。
一、核心定位与命名解析
1.1 Cyclic(循环)
可重置特性:当所有线程到达栅栏点后,栅栏自动重置为初始状态,可供下一轮同步使用。这与 CountDownLatch 的一次性设计形成鲜明对比。
1.2 Barrier(栅栏)
同步语义:阻拦线程直到满员。所有参与线程必须到达栅栏点后,才能统一放行。
// 5 个线程,在栅栏点汇合
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("全员到齐,开始下一阶段");
});
for (int i = 0; i < 5; i++) {
new Thread(() -> {
doPhase1();
barrier.await(); // 等待其他线程
doPhase2();
}).start();
}1.3 底层基石
CyclicBarrier 不是直接继承 AQS,而是基于 ReentrantLock + Condition 组合实现。
| 同步工具 | 底层实现 |
|---|---|
| CountDownLatch | AQS 共享模式 |
| CyclicBarrier | ReentrantLock + Condition |
| Semaphore | AQS 共享模式 |
| ReentrantLock | AQS 独占模式 |
二、内部架构解剖
2.1 核心成员变量
public class CyclicBarrier {
// 互斥锁:保证计数器操作的原子性
private final ReentrantLock lock = new ReentrantLock();
// 条件变量:用于阻塞未到达的线程
private final Condition trip = lock.newCondition();
// 固定参与者数量(不变)
private final int parties;
// 当前轮次剩余未到达的线程数(每轮从 parties 递减)
private int count;
// 可选的回调任务,在栅栏打开时由最后到达的线程执行
private final Runnable barrierCommand;
// 当前代(版本号/轮次标识)
private Generation generation = new Generation();
// 内部类:标识栅栏的一个"代"
private static class Generation {
boolean broken = false; // 栅栏是否被破坏
}
}2.2 变量职责说明
| 变量 | 类型 | 职责 |
|---|---|---|
| lock | ReentrantLock | 保护 count 和 generation 的并发访问 |
| trip | Condition | 阻塞等待队列(区别于 AQS 的 CLH 队列) |
| parties | int | 固定值,表示需要同步的线程总数 |
| count | int | 剩余未到达的线程数,每次 await 减 1 |
| generation | Generation | 当前轮次的标识,用于区分不同的同步周期 |
| barrierCommand | Runnable | 栅栏打开时的回调任务 |
2.3 Generation 的关键作用
Generation 是实现"循环"特性的核心机制。
private static class Generation {
boolean broken = false;
}- 每一轮同步对应一个 Generation 实例
- 当栅栏打开后,创建新的 Generation 实例
- 线程通过比较本地持有的 Generation 与全局 Generation 是否相同,判断当前轮次是否结束
三、源码深度解析:dowait() 的执行流
3.1 await() 入口
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // 不可能发生
}
}3.2 dowait() 核心逻辑
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 获取锁
try {
final Generation g = generation; // 保存当前代的引用
// 检查栅栏是否已被破坏
if (g.broken)
throw new BrokenBarrierException();
// 检查中断
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 计数器减 1
// ========== 分支 B:最后一个到达的线程 ==========
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 执行回调任务
ranAction = true;
nextGeneration(); // 开启下一代,唤醒所有等待线程
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// ========== 分支 A:前 N-1 个到达的线程 ==========
for (;;) {
try {
if (!timed)
trip.await(); // 进入 Condition 队列等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 中断处理
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 被唤醒后的检查
if (g.broken)
throw new BrokenBarrierException();
// 代际检查:如果 generation 已更新,说明本轮结束
if (g != generation)
return index; // 正常返回
// 超时处理
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock(); // 释放锁
}
}3.3 两个分支的执行流程
分支 A:前 N-1 个到达的线程
线程到达 await()
│
▼
lock.lock() ← 获取锁
│
▼
count-- ← 计数器减 1
│
▼
count > 0,进入循环
│
▼
trip.await() ← 进入 Condition 队列,释放锁,阻塞等待
│
▼
被唤醒后,检查 g != generation
│
▼
返回 index分支 B:第 N 个(最后一个)到达的线程
线程到达 await()
│
▼
lock.lock() ← 获取锁
│
▼
count-- ← 计数器减 1
│
▼
count == 0,进入 if 分支
│
▼
barrierCommand.run() ← 执行回调任务
│
▼
nextGeneration() ← 重置栅栏,唤醒所有线程
│
▼
return 0四、难点攻克:如何实现"循环"
4.1 nextGeneration() 源码
private void nextGeneration() {
// 1. 唤醒所有在 Condition 队列中等待的线程
trip.signalAll();
// 2. 重置计数器
count = parties;
// 3. 创建新的 Generation 实例
generation = new Generation();
}三步操作的意义:
- signalAll():将所有等待线程从 Condition 队列移动到 Lock 的等待队列
- count = parties:为下一轮同步重置计数器
- new Generation():创建新的代标识,使旧线程能够检测到轮次变化
4.2 代际检查机制
线程唤醒后如何知道本轮已结束?
// dowait() 中的代际检查
final Generation g = generation; // 进入时保存旧 generation
// ... 等待被唤醒 ...
// 被唤醒后检查
if (g != generation) // 比较本地旧代与全局新代
return index; // 不相等说明本轮结束,正常返回执行时序:
时间线:
T1 ─────────────────────────────────────────────────────────>
线程A(前N-1个):
│
├── 保存 g = generation (旧代)
├── trip.await() 阻塞
│
│ <--- 此时线程B执行 nextGeneration() --->
│ <--- generation 指向新对象 --->
│
├── 被 signalAll() 唤醒
├── 检查 g != generation → true(旧代 != 新代)
└── 返回,本轮结束
线程B(最后一个):
│
├── count-- 后 count == 0
├── 执行 barrierCommand.run()
├── 调用 nextGeneration()
│ ├── trip.signalAll() ← 唤醒所有等待线程
│ ├── count = parties ← 重置计数器
│ └── generation = new Generation() ← 创建新代
└── return 0关键理解:g != generation 是引用比较,不是值比较。由于 new Generation() 创建了新对象,旧引用与新引用必然不同。
五、横向对比:CyclicBarrier vs CountDownLatch
| 维度 | CyclicBarrier | CountDownLatch |
|---|---|---|
| 实现原理 | ReentrantLock + Condition | AQS 共享模式 |
| 阻塞队列 | Condition 等待队列 | CLH 等待队列 |
| 计数方式 | 加法(await 时计数) | 减法(countDown 减计数) |
| 复用性 | 可循环使用 | 一次性 |
| 重置机制 | 自动 nextGeneration() | 不可重置 |
| 回调支持 | 支持 barrierAction | 不支持 |
| 触发时机 | 全员到达后统一放行 | 计数归零后放行 |
| 适用场景 | 多线程分阶段协同 | 等待事件完成 |
本质区别:
- CyclicBarrier:线程驱动,关注"谁到达了"
- CountDownLatch:事件驱动,关注"发生了多少事件"
六、异常与边界情况
6.1 BrokenBarrierException 触发场景
| 场景 | 说明 |
|---|---|
| 等待线程被中断 | 某个线程在 await 时被调用 interrupt() |
| 超时 | 使用 await(timeout) 且超时 |
| 手动调用 reset() | 强制重置栅栏 |
6.2 breakBarrier() 源码
private void breakBarrier() {
generation.broken = true; // 标记栅栏被破坏
count = parties; // 重置计数器
trip.signalAll(); // 唤醒所有等待线程,让它们抛异常
}6.3 中断传播机制
当一个线程被中断时,所有其他等待线程都会收到 BrokenBarrierException,避免永久阻塞。
// dowait() 中的中断处理
catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier(); // 破坏栅栏
throw ie;
} else {
Thread.currentThread().interrupt();
}
}七、总结
| 知识点 | 要点 |
|---|---|
| 底层实现 | ReentrantLock + Condition,非 AQS |
| 核心变量 | parties(总数)、count(剩余)、generation(代) |
| 等待机制 | Condition.await() 进入等待队列 |
| 唤醒机制 | 最后到达的线程执行 signalAll() |
| 循环原理 | nextGeneration() 创建新代,代际检查判断轮次结束 |
| 回调执行 | 由最后到达的线程执行 barrierAction |
| 异常处理 | 一个线程中断,所有线程抛 BrokenBarrierException |
