JUC包下的CyclicBarrier类

CyclicBarrier简介

中文意思是循环栅栏,在多个线程执行的时候,线程到达一个栅栏后进入阻塞,等待所有线程到齐后再继续执行,下面利用CyclicBarrrier演示团购的逻辑,满3人成团。

		CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        ExecutorService executorService = Executors.newCachedThreadPool();
        
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "组团中..." + cyclicBarrier.getParties());
                    TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(8));
                    //每次调用时会对构造方法传入的3做自减操作,减到0后再往下执行
                    //满3人才能组团成功,未满时其他人需要等待
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "满3人组团成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();

当组团成功之后,卖家要发货,这里利用构造方法打印发货操作,在CyclicBarrier中还有一个构造方法支持传入Runnable类型的对象,我们可以在其run方法中编写当所有线程到达栅栏之后执行的代码。

CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
    System.out.println("卖家发货");
});
ExecutorService executorService = Executors.newCachedThreadPool();

for (int j = 0; j < 2; j++) {//组团2队
    for (int i = 0; i < 3; i++) {
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "组团中...");
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(8));
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "满3人组团成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    }
}

executorService.shutdown();

如下图所示,当第一个线程t1到达栅栏之后,变量count的值会做自减操作,因为count不等于0,则t1线程阻塞,等待t2,t3到达之后,count变为0,三个线程会跳过栅栏继续执行。之后栅栏中的count值会再恢复为最初的值,等待其他线程的到达。这样栅栏就可以重复使用了

CyclicBarrier如何保证重复使用

CyclicBarrier中的成员变量parties和count,我们在调用其构造方法传入的整数会赋值给final修饰的parties,然后parties再赋值给count,当有线程到达栅栏时,count会进行减1的操作,当count值为0后会重置为parties的值,这样CyclicBarrier就可以被重复使用了。在CyclicBarrier中有一个静态内部类Generation,该类用来表示分代,当栅栏被冲破之后算是一代,通过nextGeneration()方法来维护着换代等操作,其源码如下:

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

CyclicBarrier工作流程

CyclicBarrier中重载的await方法,最终都调用了dowait方法,该方法中使用了ReentrantLock锁保证了线程安全,通过Condition来阻塞和唤醒线程。dowait方法中的操作是线程安全的。

当调用await方法时,count计数器进行自减操作,之后对count进行判断:

  • count == 0 是true,会调用构造方法中传入的Runnable(倘若没有传,则不会调用)。然后调用nextGeneration()方法开始下一个分代,该方法将会重置count值为parties,然后再调用condition的signalAl1方法,唤醒所有在屏障前面等待的线程,让其开始继续执行,并且创建新的Generation实例。
  • count == 0是false,那么当前的调用线程将会通过condition的await方法,当前线程会在屏障前进行等待。