首页 文章
取消

CountDownLatch、CyclicBarrier、Phaser的含义与使用场景

CountDownLatch、CyclicBarrier、Phaser都是J.U.C包中控制线程调度的工具类,并且功能上一脉相承又层层递进,它们在一些经典的多线程场景中发挥着重要的作用。

CountDownLatch

含义

CountDownLatch可以简单理解为一个计数器。
其核心思想在于:为主线程设置一个屏障,主线程与所有子线程共享一个CountDownLatch实例,子线程在内部调用CountDownLatch.countDown()使得计数器-1。当所有子线程都执行完毕,计数器归零时,主线程才能执行后面的逻辑,否则挂起主线程。

使用

int parallel = 5;
// 为主线程设置一个并发数为5的屏障
CountDownLatch latch = new CountDownLatch(parallel);
for (int i = 0; i < parallel; i++) {
    // CachedThreadPool是自己封装的线程池
    CachedThreadPool.execute(() -> {
        CountDownLatch countDownLatch = latch;
        // 当前线程要执行的逻辑
        doWork();
        // 当前线程执行完毕,计数器-1
        countDownLatch.countDown();
     });
}
try {
    // 挂起主线程
    latch.await();
} catch (InterruptedException e) {
    logger.error(e.getMessage(), e);
}
// 计数器归零,主线程要执行的逻辑
doAfterWork();

CountDownLatch一般用在主线程需要汇总子线程执行结果的场景中,例如一个方法中需要独立调用N次外部接口分别获取数据,调用完毕后需要汇总每次的结果。

CyclicBarrier

含义

CyclicBarrier直译为回环栅栏,它为一组线程设置N个屏障,当所有线程都到达屏障后,当前线程才会与其他线程一起冲破屏障,否则需要等待其他线程。

CyclicBarrier与CountDownLatch的主要不同点:

  • CountDownLatch的屏障是一次性的,所有线程都到达后,屏障就失效了,即线程只能调用一次CountDownLatch.await()
    CyclicBarrier的屏障可重复使用,即线程可以反复调用CyclicBarrier.await(),每一次await()就是一个屏障。

  • CountDownLatch在使用中,一般需要主线程充当“裁判”的角色,当所有子线程执行完毕后,“裁判”对结果进行判定分析,而CyclicBarrier不需要。举个例子:CountDownLatch好比英雄联盟的一局比赛,裁判(主线程)必须等待所有选手(子线程)都准备完毕后才能宣布比赛开始;CyclicBarrier好比一种另类的百米跨栏,假设一共有3个栅栏,当发令员(主线程)枪响后所有选手(子线程)都起跑,但只有当所有选手都到达第一个栅栏前,他们才可以同时跨过栅栏,依此类推。而选手还在跨栏时,发令员或许早就离场(即主线程结束)了

使用

public class CyclicBarrierDemo {
    static class TaskThread extends Thread {
        CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + " 到达栅栏 A");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 A");

                Thread.sleep(2000);
                System.out.println(getName() + " 到达栅栏 B");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 B");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int threadNum = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadNum
                , () -> System.out.println(Thread.currentThread().getName() + " 完成最后任务"));

        for (int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }
    }
}

注意点:

  • 线程调用CyclicBarrier.await()时即代表设置了一个栅栏,不同的线程抵达栅栏的时机可以不同,但是栅栏数必须相同。换言之,不允许线程A调用M次await(),而线程B调用N次await()

  • CyclicBarrier的构造函数允许一个传递Runnable参数,当所有线程都到达栅栏时,由最后一个到达的线程执行Runnable中的逻辑

前文说过,不同的线程抵达栅栏的时机可以不同,即每个线程的run()逻辑都可以不同,只要保证await()的调用次数相同便可,上面的demo为了精简,没有定义不同逻辑的线程。

通过CyclicBarrier的含义以及使用方法可知,CyclicBarrier的适用场景一般是需要多个线程协作完成一个任务,但又要保证每个线程按计划执行,不能超前完成。

Phaser

含义

Phaser可以理解为CyclicBarrier的进阶版本,它的作用整体上看与CyclicBarrier大致相同,只是提供了更加精细的线程调度支持。因此Phaser也是三者中最为复杂的,使用不当则可能带来一些隐患,要格外小心。
PhaserCyclicBarrier的主要异同点:

  • CyclicBarrier需要在构造函数指定参与的线程数,创建完成后禁止增减线程;Phaser则无需指定参与的线程数,可以在运行过程中动态增减。

  • CyclicBarrier只提供了await()来强制当前线程等待其余线程一起冲破屏障,Phaser则提供了arrive()使得当前线程不必等待其余线程而直接进入下一阶段。arrive()需谨慎使用,避免破坏Phaser的同步机制与协作效果。

  • PhaserCyclicBarrier一样禁止不同线程的栅栏数不相等,所谓的禁止并非是语法上的限制,而是需要使用者自己避免。

使用

Demo 1

public class PhaserExample {
    public static void main(String[] args) {
        // Create a Phaser with 3 parties
        Phaser phaser = new Phaser(3);
        // Create and start 3 threads
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(new Task(phaser), "Thread " + i);
            thread.start();
        }
        // Wait for the completion of all phases
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("All phases are completed.");
    }
}

class Task implements Runnable {

    private Phaser phaser;

    public Task(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        // Perform first task and arrive at the phaser
        System.out.println(Thread.currentThread().getName() + " is performing task 1.");
        phaser.arriveAndAwaitAdvance();

        // Perform second task and arrive at the phaser
        System.out.println(Thread.currentThread().getName() + " is performing task 2.");
        phaser.arriveAndAwaitAdvance();

        // Perform third task and arrive at the phaser
        System.out.println(Thread.currentThread().getName() + " is performing task 3.");
        phaser.arriveAndAwaitAdvance();
    }
}

Demo 2

Phaser phaser = new Phaser();
// 注册一个party
phaser.register();
for (int i = 0; i < 3; i++) {
    phaser.register();
    CachedThreadPool.execute(() -> {
        doFirstPhaseWork();
        // 表示当前线程已到达phase-0,阻塞并等待所有线程到达phase-0
        phaser.arriveAndAwaitAdvance();
        doSecondPhaseWork();
        phaser.arriveAndAwaitAdvance();
    });
}
phaser.arriveAndAwaitAdvance();

从Demo 1可以看出,Phaser的使用大致上与CyclicBarrier相同,arriveAndAwaitAdvance()相当于CyclicBarrierawait()。但Phaser除了有arriveAndAwaitAdvance(),还有arrive()arriveAndDeregister()awaitAdvance()awaitAdvanceInterruptibly(),这些方法为Phaser带来了比CyclicBarrier更灵活细致的调度能力。

Phaser动态增减参与者的能力为一个实际业务场景提供了实现思路:一个真实的maven工程应该是由多个模块组成的,我们经常需要等待所有模块都载入完毕后再执行某些动作,此时就可以使用Phaser作为模块计数器,每一个模块开始加载时,就register()一次,由于每个模块加载的成功与否都不能影响其他模块的加载,故加载完毕后调用arrive(),然后在模块加载完毕的后置动作前调用awaitAdvance()来强制等待。

Reference

Phaser 使用介绍
Phaser并发阶段器
Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser