java并发工具类 CyclicBarrier

CyclicBarrier简介
CyclicBarrier,是JDK1.5的java.util.concurrent并发包中提供的一个并发工具类。
所谓Cyclic即 循环 的意思,所谓Barrier即 屏障 的意思。
所以综合起来,CyclicBarrier指的就是 循环屏障,虽然这个叫法很奇怪,但是确能很好地表示它的作用。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。
CyclicBarrier栅栏

CyclicBarrier和CountDownLatch是非常类似的,CyclicBarrier核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。
CyclicBarrier类是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(Common Barrier Point)。
CyclicBarrier类是一种同步机制,它能够对处理一些算法的线程实现同。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。
通过调用CyclicBarrier对象的await()方法,两个线程可以实现互相等待。一旦N个线程在等待CyclicBarrier达成,所有线程将被释放掉去继续执行。
怎么使用 CyclicBarrier
1 2
| public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction)
|
parties : 是参与线程的个数
其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
**barrierAction ** : 优先执行线程
用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

重要方法
1 2
| public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
|
- 线程调用 await() 表示自己已经到达栅栏
- BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
基本使用
代码实现
一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class CyclicBarrierTest {
private static Random random = new Random();
public static void execute(CyclicBarrier barrier) { long sleepTime = random.nextInt(10); long threadId = Thread.currentThread().getId(); try { Thread.sleep(sleepTime * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程ID" + threadId + ",准备任务完成耗时:" + sleepTime + "当前时间" + System.currentTimeMillis());
try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程ID" + threadId + ",开始执行任务,当前时间:" + System.currentTimeMillis()); }
public static void main(String[] args) { int threadNum = 10; CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任务开始...")); ExecutorService executor = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { executor.submit(() -> { execute(barrier); }); } } }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 线程ID12,准备任务完成耗时:0当前时间1565163947881 线程ID17,准备任务完成耗时:0当前时间1565163947881 线程ID16,准备任务完成耗时:0当前时间1565163947881 线程ID20,准备任务完成耗时:3当前时间1565163950881 线程ID14,准备任务完成耗时:3当前时间1565163950881 整理任务开始... 线程ID14,开始执行任务,当前时间:1565163950881 线程ID12,开始执行任务,当前时间:1565163950881 线程ID17,开始执行任务,当前时间:1565163950881 线程ID16,开始执行任务,当前时间:1565163950881 线程ID20,开始执行任务,当前时间:1565163950881 线程ID18,准备任务完成耗时:4当前时间1565163951881 线程ID13,准备任务完成耗时:5当前时间1565163952881 线程ID21,准备任务完成耗时:7当前时间1565163954881 线程ID19,准备任务完成耗时:9当前时间1565163956882 线程ID15,准备任务完成耗时:9当前时间1565163956882 整理任务开始... 线程ID15,开始执行任务,当前时间:1565163956882 线程ID18,开始执行任务,当前时间:1565163956882 线程ID13,开始执行任务,当前时间:1565163956882 线程ID21,开始执行任务,当前时间:1565163956882 线程ID19,开始执行任务,当前时间:1565163956882
|
从打印结果可以看出,所有线程会等待全部线程到达栅栏之后才会继续执行,并且最后到达的线程会完成 Runnable 的任务。
CyclicBarrier 使用场景
可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier 与 CountDownLatch 区别
- CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
- CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
- CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
- 在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。
- 同时,CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。