介绍
CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。
CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。
在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。
使用示例
public class TestCyclicBarrier {
private static final int THREAD_NUM = 5;
public static class WorkerThread implements Runnable {
CyclicBarrier barrier;
public WorkerThread(CyclicBarrier b) {
this. barrier = b;
}
@Override
public void run() {
try {
String id = "ID:" + Thread.currentThread().getId();
System. out.println(id + " before barrier");
barrier.await(); // 线程在这里等待,直到所有线程都到达barrier。
System. out.println(id + " after barrier");
} catch (Exception e) {
e. printStackTrace();
}
}
}
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier( THREAD_NUM, new Runnable() {
// 当所有线程到达barrier时执行
@Override
public void run() {
System. out.println( "\nID: " + Thread.currentThread().getId()
+ " doing barrier work .\n");
}
});
for (int i = 0; i < THREAD_NUM; i++) {
new Thread( new WorkerThread(cb)).start();
}
}
}
输出结果如下:
ID:13 before barrier ID:14 before barrier ID:10 before barrier ID:12 before barrier ID:11 before barrier ID: 14 doing barrier work . ID:14 after barrier ID:13 after barrier ID:10 after barrier ID:11 after barrier ID:12 after barrier
实现
代
CyclicBarrier 是可重用的,那么就需要一定的结构来维持每次重用的信息,其定义了一个内部类Generation
来提供这个功能。
private static class Generation {
boolean broken = false;
}
内部属性
// 由于涉及多个线程之间的同步,对共享状态访问的协调应该用锁来简化
private final ReentrantLock lock = new ReentrantLock();
// 在所有线程都到达之前,用于使线程等待的条件对象。
// 最后一个线程到达后,唤醒所有此对象上等待的线程。
private final Condition trip = lock.newCondition();
// 参与协同同步的线程数
private final int parties;
// 所有线程到达后、释放之前执行的命令。
private final Runnable barrierCommand;
// 当前的代
private Generation generation = new Generation();
// 等待到达的线程数
private int count;
核心的 wait 逻辑
CyclicBarrier 的 await 方法都会调用 dowait 方法。
// 核心代码,覆盖了不同的策略
private int dowait(boolean timed, long nanos) throws InterruptedException,
BrokenBarrierException, TimeoutException {
// 先加锁,确保线程安全性。
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 等待线程数减 1。
if (index == 0) { // 当前线程是最后到达线程,释放所有线程。
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 非第一个到达的线程要进入等待,放在循环里是防止虚假唤醒。
// 直到 tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
注意,上面的代码多处调用了 breakBarrier 方法,它的最主要作用就是唤醒所有等待线程。
reset 方法
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // 打破当前的代,以释放当前还在等待线程。
nextGeneration(); // 开启新的代。
} finally {
lock.unlock();
}
}
// 把当前的代标为broken,唤醒所有的等待线程。持有锁时调用。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。
您好,“CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。”这句话经过我的验证貌似不太合理。我的代码如下:
@Slf4j
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(2, () -> log.info(Thread.currentThread().getName() + ” is the last thread!”));
public static void main(String[] args) {
new Thread(() -> {
log.info(“线程1开始执行”);
SleepUtil.sleep(20);
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
log.info(“线程1恢复执行”);
}).start();
new Thread(() -> {
log.info(“线程2开始执行”);
try {
SleepUtil.sleep(100);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
log.info(“线程2恢复执行”);
}).start();
}
}
按照最后一个到达的线程执行指定的任务的话,应该是线程2去执行该任务吧?但是控制台总是打印线程1执行了该任务!
10:41:48.940 [Thread-0] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程1开始执行
10:41:48.947 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程2开始执行
10:41:49.051 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – Thread-1 is the last thread!
10:41:49.051 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程2恢复执行
10:41:49.051 [Thread-0] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程1恢复执行
没有问题的,,,你应该是混淆了输出里的线程。等待时间最长的最后到达,执行 runnable 任务。
private static CyclicBarrier barrier = new CyclicBarrier(2,
() -> log.info(Thread.currentThread().getName() + ” is the last thread!”));
public static void main(String[] args) {
new Thread(() -> {
log.info(Thread.currentThread().getName() + ” [20] 开始执行”);
try {
TimeUnit.MILLISECONDS.sleep(20);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
log.info(Thread.currentThread().getName() + ” [20] 恢复执行”);
}).start();
new Thread(() -> {
log.info(Thread.currentThread().getName() + ” [100] 开始执行”);
try {
TimeUnit.MILLISECONDS.sleep(100);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
log.info(Thread.currentThread().getName() + ” [100] 开恢复执行”);
}).start();
}