CountDownLatch、CyclicBarrier、Phaser都是J.U.C包中控制线程调度的工具类,并且功能上一脉相承又层层递进,它们在一些经典的多线程场景中发挥着重要的作用。
CountDownLatch
含义
CountDownLatch
可以简单理解为一个计数器。
其核心思想在于:为主线程设置一个屏障,主线程与所有子线程共享一个CountDownLatch
实例,子线程在内部调用CountDownLatch.countDown()
使得计数器-1。当所有子线程都执行完毕,计数器归零时,主线程才能执行后面的逻辑,否则挂起主线程。
使用
int parallel = 5;
// 为主线程设置一个并发数为5的屏障
CountDownLatch latch = new CountDownLatch(parallel);
for (int i = 0; i < parallel; i++) {
// CachedThreadPool是自己封装的线程池
CachedThreadPool.execute(() -> {
CountDownLatch countDownLatch = latch;
// 当前线程要执行的逻辑
doWork();
// 当前线程执行完毕,计数器-1
countDownLatch.countDown();
});
}
try {
// 挂起主线程
latch.await();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
// 计数器归零,主线程要执行的逻辑
doAfterWork();
CountDownLatch
一般用在主线程需要汇总子线程执行结果的场景中,例如一个方法中需要独立调用N次外部接口分别获取数据,调用完毕后需要汇总每次的结果。
CyclicBarrier
含义
CyclicBarrier
直译为回环栅栏,它为一组线程设置N个屏障,当所有线程都到达屏障后,当前线程才会与其他线程一起冲破屏障,否则需要等待其他线程。
CyclicBarrier与CountDownLatch的主要不同点:
CountDownLatch
的屏障是一次性的,所有线程都到达后,屏障就失效了,即线程只能调用一次CountDownLatch.await()
。
而CyclicBarrier
的屏障可重复使用,即线程可以反复调用CyclicBarrier.await()
,每一次await()
就是一个屏障。CountDownLatch
在使用中,一般需要主线程充当“裁判”的角色,当所有子线程执行完毕后,“裁判”对结果进行判定分析,而CyclicBarrier
不需要。举个例子:CountDownLatch
好比英雄联盟的一局比赛,裁判(主线程)必须等待所有选手(子线程)都准备完毕后才能宣布比赛开始;CyclicBarrier
好比一种另类的百米跨栏,假设一共有3个栅栏,当发令员(主线程)枪响后所有选手(子线程)都起跑,但只有当所有选手都到达第一个栅栏前,他们才可以同时跨过栅栏,依此类推。而选手还在跨栏时,发令员或许早就离场(即主线程结束)了。
使用
public class CyclicBarrierDemo {
static class TaskThread extends Thread {
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " 到达栅栏 A");
barrier.await();
System.out.println(getName() + " 冲破栅栏 A");
Thread.sleep(2000);
System.out.println(getName() + " 到达栅栏 B");
barrier.await();
System.out.println(getName() + " 冲破栅栏 B");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum
, () -> System.out.println(Thread.currentThread().getName() + " 完成最后任务"));
for (int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
注意点:
线程调用
CyclicBarrier.await()
时即代表设置了一个栅栏,不同的线程抵达栅栏的时机可以不同,但是栅栏数必须相同。换言之,不允许线程A调用M次await()
,而线程B调用N次await()
。CyclicBarrier
的构造函数允许一个传递Runnable
参数,当所有线程都到达栅栏时,由最后一个到达的线程执行Runnable
中的逻辑
前文说过,不同的线程抵达栅栏的时机可以不同,即每个线程的run()逻辑都可以不同,只要保证await()的调用次数相同便可,上面的demo为了精简,没有定义不同逻辑的线程。
通过CyclicBarrier
的含义以及使用方法可知,CyclicBarrier
的适用场景一般是需要多个线程协作完成一个任务,但又要保证每个线程按计划执行,不能超前完成。
Phaser
含义
Phaser
可以理解为CyclicBarrier
的进阶版本,它的作用整体上看与CyclicBarrier
大致相同,只是提供了更加精细的线程调度支持。因此Phaser
也是三者中最为复杂的,使用不当则可能带来一些隐患,要格外小心。Phaser
与CyclicBarrier
的主要异同点:
CyclicBarrier
需要在构造函数指定参与的线程数,创建完成后禁止增减线程;Phaser
则无需指定参与的线程数,可以在运行过程中动态增减。CyclicBarrier
只提供了await()
来强制当前线程等待其余线程一起冲破屏障,Phaser
则提供了arrive()
使得当前线程不必等待其余线程而直接进入下一阶段。arrive()
需谨慎使用,避免破坏Phaser
的同步机制与协作效果。Phaser
与CyclicBarrier
一样禁止不同线程的栅栏数不相等,所谓的禁止并非是语法上的限制,而是需要使用者自己避免。
使用
Demo 1
public class PhaserExample {
public static void main(String[] args) {
// Create a Phaser with 3 parties
Phaser phaser = new Phaser(3);
// Create and start 3 threads
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Task(phaser), "Thread " + i);
thread.start();
}
// Wait for the completion of all phases
phaser.awaitAdvance(phaser.getPhase());
System.out.println("All phases are completed.");
}
}
class Task implements Runnable {
private Phaser phaser;
public Task(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
// Perform first task and arrive at the phaser
System.out.println(Thread.currentThread().getName() + " is performing task 1.");
phaser.arriveAndAwaitAdvance();
// Perform second task and arrive at the phaser
System.out.println(Thread.currentThread().getName() + " is performing task 2.");
phaser.arriveAndAwaitAdvance();
// Perform third task and arrive at the phaser
System.out.println(Thread.currentThread().getName() + " is performing task 3.");
phaser.arriveAndAwaitAdvance();
}
}
Demo 2
Phaser phaser = new Phaser();
// 注册一个party
phaser.register();
for (int i = 0; i < 3; i++) {
phaser.register();
CachedThreadPool.execute(() -> {
doFirstPhaseWork();
// 表示当前线程已到达phase-0,阻塞并等待所有线程到达phase-0
phaser.arriveAndAwaitAdvance();
doSecondPhaseWork();
phaser.arriveAndAwaitAdvance();
});
}
phaser.arriveAndAwaitAdvance();
从Demo 1可以看出,Phaser
的使用大致上与CyclicBarrier
相同,arriveAndAwaitAdvance()
相当于CyclicBarrier
的await()
。但Phaser
除了有arriveAndAwaitAdvance()
,还有arrive()
、arriveAndDeregister()
、awaitAdvance()
、awaitAdvanceInterruptibly()
,这些方法为Phaser
带来了比CyclicBarrier
更灵活细致的调度能力。
Phaser动态增减参与者的能力为一个实际业务场景提供了实现思路:一个真实的maven工程应该是由多个模块组成的,我们经常需要等待所有模块都载入完毕后再执行某些动作,此时就可以使用Phaser作为模块计数器,每一个模块开始加载时,就register()一次,由于每个模块加载的成功与否都不能影响其他模块的加载,故加载完毕后调用arrive(),然后在模块加载完毕的后置动作前调用awaitAdvance()来强制等待。
Reference
Phaser 使用介绍
Phaser并发阶段器
Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser