JUC CyclicBarrier 可重用屏障

介绍

CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。

使用示例

public class TestCyclicBarrier {
    private static final int THREAD_NUM = 5;

    public static class WorkerThread implements Runnable {
     CyclicBarrier barrier;

     public WorkerThread(CyclicBarrier b) {
         this. barrier = b;
     }

     @Override
     public void run() {
         try {
          String id = "ID:" + Thread.currentThread().getId();

          System. out.println(id + " before barrier");

           barrier.await(); // 线程在这里等待,直到所有线程都到达barrier。

          System. out.println(id + " after barrier");
         } catch (Exception e) {
          e. printStackTrace();
         }
     }
    }

    public static void main(String[] args) {
     CyclicBarrier cb = new CyclicBarrier( THREAD_NUM, new Runnable() {
         // 当所有线程到达barrier时执行
         @Override
          public void run() {
          System. out.println( "\nID: " + Thread.currentThread().getId()
              + " doing barrier work .\n");
         }
     });

     for (int i = 0; i < THREAD_NUM; i++) {
         new Thread( new WorkerThread(cb)).start();
     }
    }
}

输出结果如下:

ID:13 before barrier
ID:14 before barrier
ID:10 before barrier
ID:12 before barrier
ID:11 before barrier

ID: 14 doing barrier work .

ID:14 after barrier
ID:13 after barrier
ID:10 after barrier
ID:11 after barrier
ID:12 after barrier

实现

CyclicBarrier 是可重用的,那么就需要一定的结构来维持每次重用的信息,其定义了一个内部类Generation来提供这个功能。

private static class Generation {
    boolean broken = false;
}

内部属性

// 由于涉及多个线程之间的同步,对共享状态访问的协调应该用锁来简化
private final ReentrantLock lock = new ReentrantLock();

// 在所有线程都到达之前,用于使线程等待的条件对象。
// 最后一个线程到达后,唤醒所有此对象上等待的线程。
private final Condition trip = lock.newCondition();

// 参与协同同步的线程数
private final int parties;

// 所有线程到达后、释放之前执行的命令。
private final Runnable barrierCommand;

// 当前的代
private Generation generation = new Generation();

// 等待到达的线程数
private int count;

核心的 wait 逻辑

CyclicBarrier 的 await 方法都会调用 dowait 方法。

// 核心代码,覆盖了不同的策略
private int dowait(boolean timed, long nanos) throws InterruptedException,
          BrokenBarrierException, TimeoutException {
     // 先加锁,确保线程安全性。
     final ReentrantLock lock = this.lock;
      lock.lock();
     try {
          final Generation g = generation;

          if (g.broken)
              throw new BrokenBarrierException();

          if (Thread.interrupted()) {
              breakBarrier();
              throw new InterruptedException();
          }

           int index = --count; // 等待线程数减 1。

           if (index == 0) { // 当前线程是最后到达线程,释放所有线程。
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();

                   ranAction = true;
                   nextGeneration();
                   return 0;
              } finally {
                   if (!ranAction)
                        breakBarrier();
              }
          }

           // 非第一个到达的线程要进入等待,放在循环里是防止虚假唤醒。
           // 直到  tripped, broken, interrupted, or timed out
           for (;;) {
              try {
                   if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
              } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                   } else {
                         // We're about to finish waiting even if we had not
                         // been interrupted, so this interrupt is deemed to
                         // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                   }
              }

              if (g.broken)
                   throw new BrokenBarrierException();

              if (g != generation)
                   return index;

              if (timed && nanos <= 0L) {
                   breakBarrier();
                   throw new TimeoutException();
              }
          }
     } finally {
          lock.unlock();
     }
}

注意,上面的代码多处调用了 breakBarrier 方法,它的最主要作用就是唤醒所有等待线程。

reset 方法

public void reset() {
      final ReentrantLock lock = this.lock;
     lock.lock();
      try {
          breakBarrier(); // 打破当前的代,以释放当前还在等待线程。
          nextGeneration(); // 开启新的代。
     } finally {
          lock.unlock();
     }
}

// 把当前的代标为broken,唤醒所有的等待线程。持有锁时调用。
private void breakBarrier() {
     generation.broken = true;
     count = parties;
     trip.signalAll();
}

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

JUC CyclicBarrier 可重用屏障》上有2个想法

  1. 您好,“CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。”这句话经过我的验证貌似不太合理。我的代码如下:

    @Slf4j
    public class CyclicBarrierTest {

    private static CyclicBarrier barrier = new CyclicBarrier(2, () -> log.info(Thread.currentThread().getName() + ” is the last thread!”));

    public static void main(String[] args) {
    new Thread(() -> {
    log.info(“线程1开始执行”);
    SleepUtil.sleep(20);
    try {
    barrier.await();
    } catch (Exception e) {
    e.printStackTrace();
    }
    log.info(“线程1恢复执行”);
    }).start();

    new Thread(() -> {
    log.info(“线程2开始执行”);
    try {
    SleepUtil.sleep(100);
    barrier.await();
    } catch (Exception e) {
    e.printStackTrace();
    }
    log.info(“线程2恢复执行”);
    }).start();
    }
    }

    按照最后一个到达的线程执行指定的任务的话,应该是线程2去执行该任务吧?但是控制台总是打印线程1执行了该任务!

    10:41:48.940 [Thread-0] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程1开始执行
    10:41:48.947 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程2开始执行
    10:41:49.051 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – Thread-1 is the last thread!
    10:41:49.051 [Thread-1] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程2恢复执行
    10:41:49.051 [Thread-0] INFO pers.amos.concurrent.countdown.CyclicBarrierTest – 线程1恢复执行

    • 没有问题的,,,你应该是混淆了输出里的线程。等待时间最长的最后到达,执行 runnable 任务。

      private static CyclicBarrier barrier = new CyclicBarrier(2,
      () -> log.info(Thread.currentThread().getName() + ” is the last thread!”));

      public static void main(String[] args) {
      new Thread(() -> {
      log.info(Thread.currentThread().getName() + ” [20] 开始执行”);
      try {
      TimeUnit.MILLISECONDS.sleep(20);
      barrier.await();
      } catch (Exception e) {
      e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + ” [20] 恢复执行”);
      }).start();

      new Thread(() -> {
      log.info(Thread.currentThread().getName() + ” [100] 开始执行”);
      try {
      TimeUnit.MILLISECONDS.sleep(100);
      barrier.await();
      } catch (Exception e) {
      e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + ” [100] 开恢复执行”);
      }).start();
      }

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据