热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

并发包中的CyclicBarrier同步工具

一、CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(commonbarrierpoint)。CyclicBa

一、CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

CyclicBarrier循环栅栏,Cyclic意味循环,也就是这个计数器可以反复使用。



CyclicBarrier 支持一个可选的 Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。



CyclicBarrier适用于这样的情况:创建一组任务,并行执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成(看起来有些像join())。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动,这非常像CountDownLatch,只是CountDownLatch只触发一次的时间,而 CyclicBarrier可以多次重用




1.1 API


API描述
CyclicBarrier(int parties)创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
int await()在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int getNumberWaiting()返回当前在屏障处等待的参与者数目。
void reset()将屏障重置为其初始状态

示例用法:下面是一个在并行分解设计中使用 barrier 的例子(伪代码):


class Solver {final int N;final float[][] data;final CyclicBarrier barrier;class Worker implements Runnable {int myRow;Worker(int row) { myRow = row; }public void run() {while (!done()) {processRow(myRow);try {barrier.await();} catch (InterruptedException ex) {return;} catch (BrokenBarrierException ex) {return;}}}}public Solver(float[][] matrix) {data = matrix;N = matrix.length;barrier = new CyclicBarrier(N,new Runnable() {public void run() {mergeRows(...);}});for (int i = 0; i }



在这个例子中,每个worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么done()将返回 true,所有的worker 线程都将终止。





1.2 案例分析

案例一:


CyclicBarrier使得每匹马都执行为了向前移动所必需执行的所有工作,然后必须在栅栏处等待其他所有的马都准备完毕。 当所有的马都向前移动时,CyclicBarrier将自动调用Runnable栅栏动作任务,按顺序显示马和终点线的位置。


一旦所有的任务都越过栅栏,它就会自动地为下一回合比赛做好准备。(可以拷贝运行查看运行结果)

package test;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;class Horse implements Runnable {private static int counter = 0;private final int id = counter++; //定义ID值private int strides = 0;private static Random random = new Random(47);private static CyclicBarrier cyclicBarrier;public Horse(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}public synchronized int getStrides() {return strides;}@Overridepublic void run() {try {while (!Thread.interrupted()) {synchronized (this) { //使用this,表示当前对象strides += random.nextInt(3);//随机获取一个值。表示长度值}cyclicBarrier.await(); //一直等待,当计数值达到0值时执行一次Runnable中的 run()}} catch (InterruptedException e) {} catch (BrokenBarrierException e) {throw new RuntimeException();}}public String toString() {return "Horse " + id + " ";}public String tracks() {StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i "*");}stringBuilder.append(id);return stringBuilder.toString();}
}public class HorseRace {static final int FINISH_LINE &#61; 75; //完成的步长private List horses &#61; new ArrayList<>();private ExecutorService executorService &#61; Executors.newCachedThreadPool();private CyclicBarrier cyclicBarrier;//在构造函数public HorseRace(int nHorses, final int pause) {// 在构造函数中给CyclicBarrier赋值&#xff0c;//在一组线程中的最后一个线程到达之后&#xff08;但在释放所有线程之前&#xff09;&#xff0c;该命令只在每个屏障点运行一次cyclicBarrier &#61; new CyclicBarrier(nHorses, new Runnable() {&#64;Overridepublic void run() {StringBuilder stringBuilder &#61; new StringBuilder();for (int i &#61; 0; i "&#61;");}System.out.println(stringBuilder);for (Horse horse : horses) {System.out.println(horse.tracks());}for (Horse horse : horses) {if (horse.getStrides() >&#61; FINISH_LINE) {System.out.println(horse &#43; " won");executorService.shutdownNow();return;}}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.out.println("barrier-action sleep interruptedException");}}});// 启动比赛for (int i &#61; 0; i new Horse(cyclicBarrier);horses.add(horse);executorService.execute(horse); //启动线程}}public static void main(String[] args) {int nHorse &#61; 7;int pause &#61; 200;new HorseRace(nHorse, pause);//第一个参数设置马的数量&#xff0c;第二个参数设置暂停时间}
}

可以向CyclicBarrier提供一个动作&#xff0c;它是一个Runnable,当计数值达到0时&#xff0c;自动执行&#xff0c;这是CyclicBarrierCountDownLatch之间的另外一个区别。;



案例2


其他案例情况。


public class CyclicBarrierTest {public static void main(String[] args) {ExecutorService service &#61; Executors.newCachedThreadPool();final CyclicBarrier cb &#61; new CyclicBarrier(3);for(int i&#61;0;i<3;i&#43;&#43;){Runnable runnable &#61; new Runnable(){public void run(){try {Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点1&#xff0c;当前已有" &#43; (cb.getNumberWaiting()&#43;1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候")); cb.await();Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点2&#xff0c;当前已有" &#43; (cb.getNumberWaiting()&#43;1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候"));cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" &#43; Thread.currentThread().getName() &#43; "即将到达集合地点3&#xff0c;当前已有" &#43; (cb.getNumberWaiting() &#43; 1) &#43; "个已经到达&#xff0c;" &#43; (cb.getNumberWaiting()&#61;&#61;2?"都到齐了&#xff0c;继续走啊":"正在等候")); cb.await(); } catch (Exception e) {e.printStackTrace();} }};service.execute(runnable);}service.shutdown();}
}

运行结果

线程pool-1-thread-2即将到达集合地点1&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点1&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-3即将到达集合地点1&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊
线程pool-1-thread-3即将到达集合地点2&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-2即将到达集合地点2&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点2&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊
线程pool-1-thread-3即将到达集合地点3&#xff0c;当前已有1个已经到达&#xff0c;正在等候
线程pool-1-thread-2即将到达集合地点3&#xff0c;当前已有2个已经到达&#xff0c;正在等候
线程pool-1-thread-1即将到达集合地点3&#xff0c;当前已有3个已经到达&#xff0c;都到齐了&#xff0c;继续走啊



1.2.1 CyclicBarrier 异常问题

Cyclicbarrier.await()方法可能会抛出两个异常&#xff0c;一个是InterruptedException,也就是等待过程中&#xff0c;线程被中断&#xff0c;这是很常见的异常。 大部分迫使线程等待的方法都可能会抛出这个异常&#xff0c;使得线程在等待时依然可以响应外部紧急事件。

另外一个异常则是CyclicBarrier特有的BrokenbarrierException。一旦遇到这个异常&#xff0c;则表示当前的CyclicBarrier已经破损&#xff0c;可能系统已经没有办法等待所有线程都到齐了,所以就撤销所有线程。




参考


  1. 张孝祥-Java多线程与并发库高级应用
  2. 《java编程思想》
  3. 《实战Java高并发程序设计》

推荐阅读
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 这篇文章主要介绍了Python拼接字符串的七种方式,包括使用%、format()、join()、f-string等方法。每种方法都有其特点和限制,通过本文的介绍可以帮助读者更好地理解和运用字符串拼接的技巧。 ... [详细]
  • 网络请求模块选择——axios框架的基本使用和封装
    本文介绍了选择网络请求模块axios的原因,以及axios框架的基本使用和封装方法。包括发送并发请求的演示,全局配置的设置,创建axios实例的方法,拦截器的使用,以及如何封装和请求响应劫持等内容。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文探讨了C语言中指针的应用与价值,指针在C语言中具有灵活性和可变性,通过指针可以操作系统内存和控制外部I/O端口。文章介绍了指针变量和指针的指向变量的含义和用法,以及判断变量数据类型和指向变量或成员变量的类型的方法。还讨论了指针访问数组元素和下标法数组元素的等价关系,以及指针作为函数参数可以改变主调函数变量的值的特点。此外,文章还提到了指针在动态存储分配、链表创建和相关操作中的应用,以及类成员指针与外部变量的区分方法。通过本文的阐述,读者可以更好地理解和应用C语言中的指针。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
author-avatar
tina1314520hqg_552
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有