作者:tina1314520hqg_552 | 来源:互联网 | 2023-09-25 15:23
一、CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
CyclicBarrier
循环栅栏,Cyclic
意味循环,也就是这个计数器可以反复使用。
CyclicBarrier
支持一个可选的 Runnable
命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。
CyclicBarrier
适用于这样的情况:创建一组任务,并行执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成(看起来有些像join()
)。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动,这非常像CountDownLatch
,只是CountDownLatch
只触发一次的时间,而 CyclicBarrier
可以多次重用
1.1 API
API | 描述 |
---|
CyclicBarrier(int parties) | 创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 |
CyclicBarrier(int parties, Runnable barrierAction) | 创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 |
int await() | 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。 |
int getNumberWaiting() | 返回当前在屏障处等待的参与者数目。 |
void reset() | 将屏障重置为其初始状态 |
示例用法:下面是一个在并行分解设计中使用 barrier 的例子(伪代码):
class Solver {final int N;final float[][] data;final CyclicBarrier barrier;class Worker implements Runnable {int myRow;Worker(int row) { myRow = row; }public void run() {while (!done()) {processRow(myRow);try {barrier.await();} catch (InterruptedException ex) {return;} catch (BrokenBarrierException ex) {return;}}}}public Solver(float[][] matrix) {data = matrix;N = matrix.length;barrier = new CyclicBarrier(N,new Runnable() {public void run() {mergeRows(...);}});for (int i = 0; i }
在这个例子中,每个worker
线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么done()
将返回 true,所有的worker
线程都将终止。
1.2 案例分析
案例一:
CyclicBarrier
使得每匹马都执行为了向前移动所必需执行的所有工作,然后必须在栅栏处等待其他所有的马都准备完毕。 当所有的马都向前移动时,CyclicBarrier
将自动调用Runnable
栅栏动作任务,按顺序显示马和终点线的位置。
一旦所有的任务都越过栅栏,它就会自动地为下一回合比赛做好准备。(可以拷贝运行查看运行结果)
package test;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;class Horse implements Runnable {private static int counter = 0;private final int id = counter++; private int strides = 0;private static Random random = new Random(47);private static CyclicBarrier cyclicBarrier;public Horse(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}public synchronized int getStrides() {return strides;}@Overridepublic void run() {try {while (!Thread.interrupted()) {synchronized (this) { strides += random.nextInt(3);}cyclicBarrier.await(); }} catch (InterruptedException e) {} catch (BrokenBarrierException e) {throw new RuntimeException();}}public String toString() {return "Horse " + id + " ";}public String tracks() {StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i "*");}stringBuilder.append(id);return stringBuilder.toString();}
}public class HorseRace {static final int FINISH_LINE &#61; 75; private List horses &#61; new ArrayList<>();private ExecutorService executorService &#61; Executors.newCachedThreadPool();private CyclicBarrier cyclicBarrier;public HorseRace(int nHorses, final int pause) {cyclicBarrier &#61; new CyclicBarrier(nHorses, new Runnable() {&#64;Overridepublic void run() {StringBuilder stringBuilder &#61; new StringBuilder();for (int i &#61; 0; i "&#61;");}System.out.println(stringBuilder);for (Horse horse : horses) {System.out.println(horse.tracks());}for (Horse horse : horses) {if (horse.getStrides() >&#61; FINISH_LINE) {System.out.println(horse &#43; " won");executorService.shutdownNow();return;}}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.out.println("barrier-action sleep interruptedException");}}});for (int i &#61; 0; i new Horse(cyclicBarrier);horses.add(horse);executorService.execute(horse); }}public static void main(String[] args) {int nHorse &#61; 7;int pause &#61; 200;new HorseRace(nHorse, pause);}
}
可以向CyclicBarrier
提供一个动作&#xff0c;它是一个Runnable
,当计数值达到0时&#xff0c;自动执行&#xff0c;这是CyclicBarrier
和CountDownLatch
之间的另外一个区别。;
案例2
其他案例情况。
public class CyclicBarrierTest {public static void main(String[] args) {ExecutorService service &#61; Executors.newCachedThreadPool()final CyclicBarrier cb &#61; new CyclicBarrier(3)for(int i&#61;0Runnable runnable &#61; new Runnable(){public void run(){try {Thread.sleep((long)(Math.random()*10000))System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点1&#xff0c;当前已有" &#43; (cb.getNumberWaiting()&#43;1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候"))cb.await()Thread.sleep((long)(Math.random()*10000))System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点2&#xff0c;当前已有" &#43; (cb.getNumberWaiting()&#43;1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候"))cb.await()Thread.sleep((long)(Math.random()*10000))System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点3&#xff0c;当前已有" &#43; (cb.getNumberWaiting() &#43; 1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候"))cb.await()} catch (Exception e) {e.printStackTrace()} }}service.execute(runnable)}service.shutdown()}
}
运行结果
线程pool-1-thread-2即将到达集合地点1&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点1&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-3即将到达集合地点1&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊
线程pool-1-thread-3即将到达集合地点2&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-2即将到达集合地点2&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点2&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊
线程pool-1-thread-3即将到达集合地点3&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-2即将到达集合地点3&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点3&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊
1.2.1 CyclicBarrier 异常问题
Cyclicbarrier.await()
方法可能会抛出两个异常&#xff0c;一个是InterruptedException
,也就是等待过程中&#xff0c;线程被中断&#xff0c;这是很常见的异常。 大部分迫使线程等待的方法都可能会抛出这个异常&#xff0c;使得线程在等待时依然可以响应外部紧急事件。
另外一个异常则是CyclicBarrier
特有的BrokenbarrierException
。一旦遇到这个异常&#xff0c;则表示当前的CyclicBarrier已经破损&#xff0c;可能系统已经没有办法等待所有线程都到齐了,所以就撤销所有线程。
参考
- 张孝祥-Java多线程与并发库高级应用
- 《java编程思想》
- 《实战Java高并发程序设计》