热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

Java多线程同步器代码详解

这篇文章主要介绍了Java多线程同步器代码详解,文章分别介绍了是CountDownLatch,Semaphore,Barrier和Exchanger以及其相关代码示例,具有一定参考价值,需要的朋友可以了解下。

同步器

为每种特定的同步问题提供了解决方案,同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier 和Exchanger

Semaphore

Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。

测试类:

package concurrent;
import concurrent.thread.SemaphoreThread;
import java.util.concurrent.Semaphore;
/**
  * 拿客
  * www.coderknock.com
  * QQ群:213732117
  * 创建时间:2016年08月08日
  * 描述:
  */
public class SemaphoreTest {
	public static void main(String[] args) {
		//在Thread里声明并不是同一个对象
		Semaphore semaphore = new Semaphore(3);
		SemaphoreThread testA = new SemaphoreThread("A", semaphore);
		SemaphoreThread testB = new SemaphoreThread("B", semaphore);
		SemaphoreThread testC = new SemaphoreThread("C", semaphore);
		SemaphoreThread testD = new SemaphoreThread("D", semaphore);
		SemaphoreThread testE = new SemaphoreThread("E", semaphore);
		SemaphoreThread testF = new SemaphoreThread("F", semaphore);
		SemaphoreThread testG = new SemaphoreThread("G", semaphore);
		testA.start();
		testB.start();
		testC.start();
		testD.start();
		testE.start();
		testF.start();
		testG.start();
	}
}

线程写法:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Semaphore;
/**
  * 拿客
  * www.coderknock.com
  * QQ群:213732117
  * 创建时间:2016年08月08日
  * 描述:
  */
public class SemaphoreThread extends Thread {
	private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
	//创建有3个信号量的信号量计数器
	public Semaphore semaphore;
	public SemaphoreThread(String name, Semaphore semaphore) {
		setName(name);
		this.semaphore = semaphore;
	}
	@Override
	    public void run() {
		try {
			logger.debug(getName() + " 取号等待... " + System.currentTimeMillis());
			//取出一个信号
			semaphore.acquire();
			logger.debug(getName() + " 提供服务... " + System.currentTimeMillis());
			sleep(1000);
			logger.debug(getName() + " 完成服务... " + System.currentTimeMillis());
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug(getName() + " 释放... " + System.currentTimeMillis());
		//释放一个信号
		semaphore.release();
	}
}

执行结果【以下所有输出结果中[]中为线程名称- 后为输出的内容】:

  [C] - C 取号等待... 1470642024037
  [F] - F 取号等待... 1470642024036
  [E] - E 取号等待... 1470642024036
  [B] - B 取号等待... 1470642024037
  [D] - D 取号等待... 1470642024037
  [A] - A 取号等待... 1470642023965
  [D] - D 提供服务... 1470642024039
  [C] - C 提供服务... 1470642024039
  [G] - G 取号等待... 1470642024036
  [F] - F 提供服务... 1470642024040
  [D] - D 完成服务... 1470642025039
  [C] - C 完成服务... 1470642025039
  [D] - D 释放... 1470642025040
  [F] - F 完成服务... 1470642025040
  [C] - C 释放... 1470642025041
  [B] - B 提供服务... 1470642025042
  [A] - A 提供服务... 1470642025042
  [F] - F 释放... 1470642025043
  [E] - E 提供服务... 1470642025043
  [A] - A 完成服务... 1470642026043
  [B] - B 完成服务... 1470642026043
  [B] - B 释放... 1470642026043
  [A] - A 释放... 1470642026043
  [G] - G 提供服务... 1470642026044
  [E] - E 完成服务... 1470642026045
  [E] - E 释放... 1470642026045
  [G] - G 完成服务... 1470642027045
  [G] - G 释放... 1470642027046

可以看到,当3个信号量被领取完之后,之后的线程会阻塞在领取信号的位置,当有信号量释放之后才会继续执行。

CountDownLatch

CountDownLatch【倒计时锁】,线程中调用countDownLatch.await()使进程进入阻塞状态,当达成指定次数后(通过countDownLatch.countDown())继续执行每个线程中剩余的内容。

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

  用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

测试类:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;

public class package concurrent;
import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchTest {
	private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);
	public static void main(String[] args) throws InterruptedException {
		//设定当达成三个计数时触发
		CountDownLatch countDownLatch = new CountDownLatch(3);
		new CountDownLatchThread("A", countDownLatch).start();
		new CountDownLatchThread("B", countDownLatch).start();
		new CountDownLatchThread("C", countDownLatch).start();
		new CountDownLatchThread("D", countDownLatch).start();
		new CountDownLatchThread("E", countDownLatch).start();
		for (int i = 3; i > 0; i--) {
			Thread.sleep(1000);
			logger.debug(i);
			countDownLatch.countDown();
		}
	}
}

线程类:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchThread extends Thread {
	private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
	//计数器
	private CountDownLatch countDownLatch;
	public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
		setName(name);
		this.countDownLatch = countDownLatch;
	}
	@Override
	  public void run() {
		logger.debug("执行操作...");
		try {
			sleep(1000);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("等待计数器达到标准...");
		try {
			//让线程进入阻塞状态,等待计数达成后释放
			countDownLatch.await();
			logger.debug("计数达成,继续执行...");
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

执行结果:

 [E] - 执行操作...
 [B] - 执行操作...
 [A] - 执行操作...
 [C] - 执行操作...
 [D] - 执行操作...
 [main] DEBUG concurrent.CountDownLatchTest - 3
 [B] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [A] - 等待计数器达到标准...
 [main] DEBUG concurrent.CountDownLatchTest - 2
 [main] DEBUG concurrent.CountDownLatchTest - 1
 [E] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...

CyclicBarrier

CyclicBarrier【Cyclic周期,循环的 Barrier屏障,障碍】循环的等待阻塞的线程个数到达指定数量后使参与计数的线程继续执行并可执行特定线程(使用不同构造函数可以不设定到达后执行),其他线程仍处于阻塞等待再一次达成指定个数。

测试类:

package concurrent;
import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
	private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);
	public static void main(String[] args) {
		//可以使用CyclicBarrier(int parties)不设定到达后执行的内容
		CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
			logger.debug("---计数到达后执行的内容----");
		}
		);
		new CyclicBarrierThread("A", cyclicBarrier).start();
		new CyclicBarrierThread("B", cyclicBarrier).start();
		new CyclicBarrierThread("C", cyclicBarrier).start();
		new CyclicBarrierThread("D", cyclicBarrier).start();
		new CyclicBarrierThread("E", cyclicBarrier).start();
		new CyclicBarrierThread("A2", cyclicBarrier).start();
		new CyclicBarrierThread("B2", cyclicBarrier).start();
		new CyclicBarrierThread("C2", cyclicBarrier).start();
		new CyclicBarrierThread("D2", cyclicBarrier).start();
		new CyclicBarrierThread("E2", cyclicBarrier).start();
		//需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程,
		// 那么当达到5个数量时,只会执行达到时的五个线程的内容,
		// 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束
		// new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束
	}
}

线程类:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierThread extends Thread {
	private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);
	private CyclicBarrier cyclicBarrier;
	public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
		super(name);
		this.cyclicBarrier = cyclicBarrier;
	}
	@Override
	  public void run() {
		logger.debug("执行操作...");
		try {
			int time = new Random().nextint(10) * 1000;
			logger.debug("休眠" + time/1000 + "秒");
			sleep(time);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("等待计数器达到标准...");
		try {
			//让线程进入阻塞状态,等待计数达成后释放
			cyclicBarrier.await();
			logger.debug("计数达成,继续执行...");
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

执行结果:

 [A] - 执行操作...
 [A] - 休眠0秒
 [E2] - 执行操作...
 [E2] - 休眠5秒
 [D2] - 执行操作...
 [D2] - 休眠4秒
 [C2] - 执行操作...
 [C2] - 休眠4秒
 [B2] - 执行操作...
 [B2] - 休眠6秒
 [A2] - 执行操作...
 [A2] - 休眠8秒
 [E] - 执行操作...
 [E] - 休眠5秒
 [D] - 执行操作...
 [D] - 休眠0秒
 [C] - 执行操作...
 [C] - 休眠3秒
 [B] - 执行操作...
 [B] - 休眠7秒
 [A] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D2] - 等待计数器达到标准...
 [C2] - 等待计数器达到标准...
 [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [C2] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [D2] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [E2] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [B2] - 等待计数器达到标准...
 [B] - 等待计数器达到标准...
 [A2] - 等待计数器达到标准...
 [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [E] - 计数达成,继续执行...
 [B2] - 计数达成,继续执行...
 [E2] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [A2] - 计数达成,继续执行...

可以想象成以前不正规的长途汽车站的模式:

不正规的长途汽车站会等待座位坐满之后才发车,到达目的地之后继续等待然后循环进行。每个人都是一个Thread,上车后触发cyclicBarrier.await();,当坐满时就是达到指定达成数的时候,车辆发车就是达成后统一执行的内容,发车后车上的人们就可以聊天之类的操作了【我们暂且理解为上车后人们就都不能动了O(∩_∩)O~】。

CountDownLatch与CyclicBarrier区别:

CountDownLatch是一个或多个线程等待计数达成后继续执行,await()调用并没有参与计数。

CyclicBarrier则是N个线程等待彼此执行到零界点之后再继续执行,await()调用的同时参与了计数,并且CyclicBarrier支持条件达成后执行某个动作,而且这个过程是循环性的。

Exchanger

Exchanger 用于线程间进行数据交换

  可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。  Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

  用法示例:以下是重点介绍的一个类,该类使用 Exchanger 在线程间交换缓冲区,因此,在需要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程。 测试类:

package concurrent;
import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;
import java.util.HashMap;
import java.util.concurrent.Exchanger;

public class ExchangerTest {
	public static void main(String[] args) {
		Exchanger> exchanger = new Exchanger<>();
		new ExchangerThread("A", exchanger).start();
		new ExchangerThread("B", exchanger).start();
	}
}

实体类:

package concurrent.pojo;
import com.alibaba.fastjson.JSON;
import java.util.Date;
import java.util.List;

public class ExchangerPojo {
	private int intVal;
	private String strVal;
	private List strList;
	private Date date;
	public ExchangerPojo(int intVal, String strVal, List strList, Date date) {
		this.intVal = intVal;
		this.strVal = strVal;
		this.strList = strList;
		this.date = date;
	}
	public int getIntVal() {
		return intVal;
	}
	public void setIntVal(int intVal) {
		this.intVal = intVal;
	}
	public String getStrVal() {
		return strVal;
	}
	public void setStrVal(String strVal) {
		this.strVal = strVal;
	}
	public List getStrList() {
		return strList;
	}
	public void setStrList(List strList) {
		this.strList = strList;
	}
	public Date getDate() {
		return date;
	}
	public void setDate(Date date) {
		this.date = date;
	}
	@Override
	  public String toString() {
		return JSON.toJSONString(this);
	}
}

线程类:

package concurrent.thread;
import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.Exchanger;

public class ExchangerThread extends Thread {
	private Exchanger> exchanger;
	private static final Logger logger = LogManager.getLogger(ExchangerThread.class);
	public ExchangerThread(String name, Exchanger> exchanger) {
		super(name);
		this.exchanger = exchanger;
	}
	@Override
	  public void run() {
		HashMap map = new HashMap<>();
		logger.debug(getName() + "提供者提供数据...");
		Random random = new Random();
		for (int i = 0; i <3; i++) {
			int index = random.nextint(10);
			List list = new ArrayList<>();
			for (int j = 0; j  " + j);
			}
			ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date());
			map.put("第" + i + "个数据", pojo);
		}
		try {
			int time = random.nextint(10);
			logger.debug(getName() + "等待" + time + "秒....");
			for (int i = time; i > 0; i--) {
				sleep(1000);
				logger.debug(getName() + "---->" + i);
			}
			//等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了
			HashMap getMap = exchanger.exchange(map);
			time = random.nextint(10);
			logger.debug(getName() + "接受到数据等待" + time + "秒....");
			for (int i = time; i > 0; i--) {
				sleep(1000);
				logger.debug(getName() + "---->" + i);
			}
			getMap.forEach((x, y) -> {
				logger.debug(x + " -----> " + y.toString());
			}
			);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

执行结果:

 [B] - B提供者提供数据...
 [A] - A提供者提供数据...
 [A] - A等待2秒....
 [B] - B等待0秒....
 [A] - A---->2
 [A] - A---->1
 [B] - B接受到数据等待1秒....
 [A] - A接受到数据等待4秒....
 [B] - B---->1
 [A] - A---->4
 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"}
 [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"}
 [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"}
 [A] - A---->3
 [A] - A---->2
 [A] - A---->1
 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"}
 [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}
 [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}

Phaser

Phaser个人感觉兼具了CountDownLatch与CyclicBarrier的功能,并提供了分阶段的能力。

实现分阶段的CyclicBarrier的功能

测试代码:

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;

public class PhaserTest {
	private static final Logger logger = LogManager.getLogger(PhaserTest.class);
	public static void main(String[] args) {
		Phaser phaser = new Phaser() {
			/**此方法有2个作用:
       * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
       * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
       * */
			@Override
			      protected Boolean onAdvance(int phase, int registeredParties) {
				logger.debug("阶段--->" + phase);
				logger.debug("注册的线程数量--->" + registeredParties);
				return super.onAdvance(phase, registeredParties);
			}
		}
		;
		for (int i = 3; i > 0; i--) {
			new PhaserThread("第" + i + "个", phaser).start();
		}
	}
}

线程代码:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.Phaser;

public class PhaserThread extends Thread {
	private Phaser phaser;
	private static final Logger logger = LogManager.getLogger(PhaserThread.class);
	public PhaserThread(String name, Phaser phaser) {
		super(name);
		this.phaser = phaser;
		//把当前线程注册到Phaser
		this.phaser.register();
		logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程");
	}
	@Override
	  public void run() {
		logger.debug("进入...");
		phaser.arrive();
		for (int i = 6; i > 0; i--) {
			int time = new Random().nextint(5);
			try {
				logger.debug("睡眠" + time + "秒");
				sleep(time * 1000);
				if (i == 1) {
					logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
					logger.debug("最后一次触发,并注销自身");
					phaser.arriveAndDeregister();
					logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
				} else {
					logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
					logger.debug(i + "--->触发并阻塞...");
					phaser.arriveAndAwaitAdvance();
					//相当于CyclicBarrier.await();
					logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
				}
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties());
	}
}

执行结果:

 [main] - name为第3个的线程注册了1个线程
 [main] - name为第2个的线程注册了2个线程
 [main] - name为第1个的线程注册了3个线程
 [第3个] - 进入...
 [第2个] - 进入...
 [第3个] - 睡眠2秒
 [第2个] - 睡眠1秒
 [第1个] - 进入...
 [第1个] - 阶段--->0
 [第1个] - 注册的线程数量--->3
 [第1个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 6--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第3个] - 6--->触发并阻塞...
 [第1个] - 未完成的线程数量:1
 [第1个] - 6--->触发并阻塞...
 [第1个] - 阶段--->1
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠1秒
 [第3个] - 睡眠0秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 5--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 5--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 5--->触发并阻塞...
 [第2个] - 阶段--->2
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠0秒
 [第3个] - 睡眠2秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第2个] - 4--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第1个] - 未完成的线程数量:2
 [第3个] - 4--->触发并阻塞...
 [第1个] - 4--->触发并阻塞...
 [第1个] - 阶段--->3
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第3个] - 睡眠1秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 3--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 3--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 3--->触发并阻塞...
 [第2个] - 阶段--->4
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠2秒
 [第1个] - 睡眠2秒
 [第3个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 2--->触发并阻塞...
 [第1个] - 2--->触发并阻塞...
 [第3个] - 未完成的线程数量:1
 [第3个] - 2--->触发并阻塞...
 [第3个] - 阶段--->5
 [第3个] - 注册的线程数量--->3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第3个] - 睡眠2秒
 [第1个] - 睡眠3秒
 [第2个] - 睡眠0秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 最后一次触发,并注销自身
 [第2个] - 未完成的线程数量:2
 [第2个] - 注销完成之后注册的线程数量--->2
 [第3个] - 未完成的线程数量:2
 [第3个] - 最后一次触发,并注销自身
 [第3个] - 未完成的线程数量:1
 [第3个] - 注销完成之后注册的线程数量--->1
 [第1个] - 未完成的线程数量:1
 [第1个] - 最后一次触发,并注销自身
 [第1个] - 阶段--->6
 [第1个] - 注册的线程数量--->0
 [第1个] - 未完成的线程数量:0
 [第1个] - 注销完成之后注册的线程数量--->0

上面代码中,当所有线程进行到arriveAndAwaitAdvance()时会触发计数并且将线程阻塞,等计数数量等于注册线程数量【即所有线程都执行到了约定的地方时,会放行,是所有线程得以继续执行,并触发onAction事件】。我们可以在onAction中根据不同阶段执行不同内容的操作。

实现分阶段的CountDownLatch的功能

只需将上面的测试类更改如下:

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
import static jodd.util.ThreadUtil.sleep;

public class PhaserTest {
	private static final Logger logger = LogManager.getLogger(PhaserTest.class);
	public static void main(String[] args) {
		//这里其实相当于已经注册了3个线程,但是并没有实际的线程
		int coutNum=3;
		Phaser phaser = new Phaser(coutNum) {
			/**此方法有2个作用:
       * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
       * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
       * */
			@Override
			      protected Boolean onAdvance(int phase, int registeredParties) {
				logger.debug("阶段--->" + phase);
				logger.debug("注册的线程数量--->" + registeredParties);
				return registeredParties==coutNum;
				//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser
			}
		}
		;
		for (int i = 3; i > 0; i--) {
			new PhaserThread("第" + i + "个", phaser).start();
		}
		//当phaser未终止时循环注册这块儿可以使用实际的业务处理
		while (!phaser.isTerminated()) {
			sleep(1000);
			logger.debug("触发一次");
			phaser.arrive();
			//相当于countDownLatch.countDown();
		}
	}
}

 总结

以上就是本文关于Java多线程同步器代码详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:

Java多线程中断机制三种方法及示例

浅谈Java多线程处理中Future的妙用(附源码)

Java通过卖票理解多线程

如有不足之处,欢迎留言指出。


推荐阅读
  • 本文介绍了 PHP 的基本概念、服务器与客户端的工作原理,以及 PHP 如何与数据库交互。同时,还涵盖了常见的数据库操作和安全性问题。 ... [详细]
  • 本打算教一步步实现koa-router,因为要解释的太多了,所以先简化成mini版本,从实现部分功能到阅读源码,希望能让你好理解一些。希望你之前有读过koa源码,没有的话,给你链接 ... [详细]
  • 文章目录python包-requests关于requests包安装和使用pythonrequests请求超时设置工作中遇到的常见问题整理访问https网站,报错cer ... [详细]
  • ABP框架是ASP.NET Boilerplate的简称,它不仅是一个开源且文档丰富的应用程序框架,还提供了一套基于领域驱动设计(DDD)的最佳实践架构模型。本文将详细介绍ABP框架的特点、项目结构及其在Web API优先架构中的应用。 ... [详细]
  • 作为一名新手开发者,我正在尝试使用 ASP.NET 和 Vue.js 构建一个单页面应用,涉及多个复杂组件(如按钮、图表等)。希望有经验的开发者能够提供指导。 ... [详细]
  • 本文介绍了如何通过 AJAX 发送请求到后端控制器,并将返回的 JSON 数据解析后在前端页面上显示。具体步骤包括发送 AJAX 请求、解析 JSON 字符串和遍历数据。 ... [详细]
  • 本文介绍了在Java中遍历HashMap的三种常见方法:使用entrySet()、keySet()以及Java 8引入的forEach。每种方法都有其特点和适用场景。 ... [详细]
  • Python环境中字体放大的解决方法
    在使用Python开发环境时,有时会遇到无法通过Ctrl+鼠标滚轮放大字体的问题。本文将介绍如何在不同环境下解决这一问题,包括在没有Settings选项的情况下的替代方案。 ... [详细]
  • 优先队列是一种特殊的队列,不遵循先进先出原则。它分为最大优先队列和最小优先队列。最大优先队列总是将当前最大的元素优先出队,而最小优先队列则总是将当前最小的元素优先出队。本文将详细介绍如何使用二叉堆在C#中实现这两种优先队列。 ... [详细]
  • 近年来,区块链技术备受关注,其中比特币(Bitcoin)功不可没。尽管数字货币的概念早在上个世纪就被提出,但直到比特币的诞生,这一概念才真正落地生根。本文将详细探讨比特币、以太坊和超级账本(Hyperledger)的核心技术和应用场景。 ... [详细]
  • 【转】强大的矩阵奇异值分解(SVD)及其应用
    在工程实践中,经常要对大矩阵进行计算,除了使用分布式处理方法以外,就是通过理论方法,对矩阵降维。一下文章,我在 ... [详细]
  • 今日深入研究了树状数组,感觉难度较大,通过课件和博客辅助学习,仍有许多疑惑。主要探讨了老师推荐的三道题目,初步掌握了树状数组的基本用法。同时,还学习了逆序数和离散化的概念及其应用。 ... [详细]
  • 自动驾驶中的9种传感器融合算法
    来源丨AI修炼之路在自动驾驶汽车中,传感器融合是融合来自多个传感器数据的过程。该步骤在机器人技术中是强制性的,因为它提供了更高的可靠性、冗余性以及最终的 ... [详细]
  • 欧拉法与龙格-库塔法在微分方程求解中的对比分析
    本文探讨了计算机如何理解和模拟连续系统的动态特性,重点介绍了欧拉法和龙格-库塔法这两种常用的数值积分方法。通过详细的理论分析和MATLAB代码实现,对比了两种方法在求解微分方程时的性能和适用性。 ... [详细]
  • 本文详细介绍了Dijkstra算法,该算法用于解决图中从单个源点到其他所有顶点的最短路径问题。 ... [详细]
author-avatar
xao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有