在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。在 CountDownLatch 出现之前一般都使用线程 join() 方法来实现这一点,但是 join 方法不够灵活,不能满足不同场景的需要,所以 JDK 开发提供了 CountDownLatch 这个类,使用 CountDownLatch 代码如下:
// 计数器:判断线程池的任务是否已经全部执行完import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class CountDownLatchDemo1 {public static void main(String[] args) throws InterruptedException {// 创建计数器CountDownLatch countDownLatch &#61; new CountDownLatch(5);// 创建新线程执行任务for (int i &#61; 0; i < 5; i&#43;&#43;) {new Thread(() -> {Thread currThread &#61; Thread.currentThread();System.out.println(currThread.getName() &#43; "开始执行");// 线程执行所用时间int runTime &#61; (1 &#43; new Random().nextInt(5));try {TimeUnit.SECONDS.sleep(runTime);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(currThread.getName() &#43; "执行完成&#xff0c;用时" &#43; runTime);//计数器-1countDownLatch.countDown();},"线程" &#43; i&#43;"-> ").start();}countDownLatch.await(); // 阻塞等待&#xff0c;直到所有线程执行完System.out.println("执行结果");}
}
在如上代码中&#xff0c;创建了一个 CountDownLatch 实例&#xff0c;用 for 循环创建 5 个线程&#xff0c;所以给构造函数传递参数为 5。主线程调用 countDownLatch.await() 方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown() 方法让countDownLatch 内部的计数器减 1&#xff0c;所有子线程执行完毕后并调用 countDown() 方法后计数器会变为 0&#xff0c;这时候主线程的 await() 方法才会返回。
以上代码是用直接循环创建 5 个线程实现的&#xff0c;其实在项目实践中一般都避免直接操作线程&#xff0c;而是使用 ExecutorService 线程池来管理&#xff0c;使用 ExecutorService 时传递的参数是 Runnable 或者 Callable 对象&#xff0c;这时候你就没有办法调用这些线程的 join() 方法&#xff0c;这就需要选择使用 CountDownLatch 了&#xff0c;将上面代码修改如下&#xff1a;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CountDownLatchDemo2 {// 创建计数器private static CountDownLatch countDownLatch &#61; new CountDownLatch(5);public static void main(String[] args) throws InterruptedException {// 创建线程池ExecutorService executorService &#61; Executors.newFixedThreadPool(5);for (int i &#61; 0; i < 5; i&#43;&#43;) {executorService.submit(new Runnable() {&#64;Overridepublic void run() {Thread currThread &#61; Thread.currentThread();System.out.println(currThread.getName() &#43; "开始执行");// 线程执行所用时间int runTime &#61; (1 &#43; new Random().nextInt(5));try {TimeUnit.SECONDS.sleep(runTime);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(currThread.getName() &#43; "执行完成&#xff0c;用时" &#43; runTime);//计数器-1countDownLatch.countDown();}});}countDownLatch.await(); // 阻塞等待&#xff0c;直到所有线程执行完System.out.println("执行结果");}
}
一个区别是&#xff0c;调用一个子线程的 join() 方法后&#xff0c;该线程会一直被阻塞直到子线程执行完毕&#xff0c;而 CountDownLatch 可以在子线程运行的任何时候让 await 方法返回而不一定必须等到线程结束。另外&#xff0c;使用线程池来管理线程时一般都是直接添加 Runnable 到线程池&#xff0c;这时候就没有办法再调用线程的 join 方法了&#xff0c;就是说 countDownLatch 相比 join 方法让我们对线程同步有更灵活的控制。
上面介绍的 CountDownLatch 在解决多个线程同步方面相对于调用 join 方法已经有了不少优化&#xff0c;但是 CountDownLatch 的计数器是一次性的&#xff0c;也就是等到计数器变为 0 后&#xff0c;再调用 CountDownLatch 的 await 和 countdownLatch 方法都会立即返回&#xff0c;这就起不到线程同步的效果了。所以为了满足计数器可以重置的需要&#xff0c;JDK 开发组提供了 CyclicBarrier 类&#xff0c;并且 CyclicBarrier 类的功能并不限于 CountDownLatch 的功能。从字面意思理解&#xff0c;CyclicBarrier 是回环屏障的意思&#xff0c;它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕&#xff0c;并重置 CyclicBarrier 的状态后它可以被重用。之所以叫做屏障是因为所有线程调用 await 方法后会被阻塞&#xff0c;这个阻塞点就称为屏蔽点&#xff0c;等所有线程都调用了 await 方法后&#xff0c;线程们就会冲破屏障&#xff0c;继续向下执行。
下面是一个演示&#xff1a;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;// 循环屏障
public class CyclicBarrierDemo1 {public static void main(String[] args) {// 循环屏障CyclicBarrier cyclicBarrier &#61; new CyclicBarrier(4, new Runnable() {&#64;Overridepublic void run() {System.out.println("计数器为 0 了");}});// 创建线程池ExecutorService service &#61; Executors.newFixedThreadPool(4);for (int i &#61; 0; i < 4; i&#43;&#43;) {int finalI &#61; i;service.submit(() -> {Thread currThread &#61; Thread.currentThread();System.out.println("执行线程&#xff1a;" &#43; currThread.getName());try {Thread.sleep(500 * finalI);cyclicBarrier.await(); // 执行阻塞等待&#xff08;直到循环屏障的计数器为0的时候&#xff0c;再执行后面的代码&#xff09;} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程执行完成:"&#43;currThread.getName());});}}
}
上面代码中&#xff0c;每个子线程在开始执行后都调用了 await 方法&#xff0c;等到所有线程都到达屏障点后才会一块往下执行&#xff0c;这就保证了所有线程的阶段性执行。
Semaphore 信号量也是 Java 中的一个同步器&#xff0c;与 CountDownLatch 和 CycleBarrier 不同的是&#xff0c;它内部的计数器是递增的&#xff0c;并且在一开始初始化 Semaphore 时可以指定一个初始值&#xff0c;但是并不需要知道同步的线程个数&#xff0c;而是在需要同步的地方调用 acquire 方法时指定需要同步的线程个数。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreDemo2 {private static Semaphore semaphore &#61; new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService &#61; Executors.newFixedThreadPool(2);for (int i &#61; 0; i < 2; i&#43;&#43;) {executorService.submit(new Runnable() {&#64;Overridepublic void run() {System.out.println(Thread.currentThread()&#43; "开始执行");semaphore.release();}});}// 等待子线程执行完毕&#xff0c;返回semaphore.acquire(2);System.out.println("所有线程执行完毕");// 关闭线程池executorService.shutdown();}
}
如上代码首先创建了一个信号量实例&#xff0c;构造函数的入参为 0&#xff0c;说明当前信号量计数器值为 0。然后 mian 函数向线程池添加两个线程任务&#xff0c;在每个线程内部调用信号量的 release 方法&#xff0c;这相当于让计数器值递增 1。最后在 main线程里面调用信号量的 acquire 方法&#xff0c;传参为 2 说明调用 acquire 方法的线程会一直阻塞&#xff0c;知道信号量的计数变为 2 才会返回。看到这里也就明白了&#xff0c;如果构造方法 Semaphore 时传递的参数为 N&#xff0c;并在 M 个线程中调用了该信号量的 release 方法&#xff0c;那么在调用 acquire 使 M 个线程同步时传递的参数应该是 M&#43;N。
此文介绍了并发包中关于线程协作的一些重要类。首先 CountDownLatch 通过计数器提供了更灵活的控制&#xff0c;只要检测到计数器值为 0&#xff0c;就可以往下执行&#xff0c;这相比于 join 必须等待线程执行完毕后主线程才会继续向下运行更灵活。另外&#xff0c;CyclicBarrier 也可以达到 CountDownLatch 的效果&#xff0c;但是后者在计数器值变为 0 后&#xff0c;就不能再被复用&#xff0c;而前者则可以使用 reset 方法重置后复用&#xff0c;前者对同一个算法但是输入参数不同的类似场景比较使用。而 Semaphore 采用了信号量递增的策略&#xff0c;一开始并不需要关心同步的线程个数&#xff0c;等调用 aquire 方法时再指定需要同步的个数&#xff0c;并且提供了获取信号量的公平性策略。