队列先进先出的一种数据结构,那什么是阻塞队列呢?
从名字可以看出阻塞队列其实也就是队列的一种特殊情况。
从上面这张图我们会发现这样的规律:
(1)当阻塞队列为空时,从队列中获取元素的操作将会被阻塞,就好比餐馆休息区没人了,此时不能接纳新的顾客了。换句话,肚子为空的时候也没东西吃。
(2)当阻塞队列满了,往队列添加元素的操作将会被阻塞,好比餐馆的休息区也挤满了,后来的顾客只能走了。
从上面的概念我们类比到线程中去,我们会发现,在某些时候线程可能不能不阻塞,因为CPU内核就那么几个,阻塞现状更加说明了资源的利用率高,换句话来说,阻塞其实是一个好事。
阻塞队列应用最广泛的是生产者和消费者模式。在没有阻塞队列前是这样子的,
版本1:synchronized ,wait,notifyAll 配合实现线程阻塞、唤醒
class XiaoFeiSyn{private int number=0;public synchronized void product() throws Exception {try {while (number != 0) {//阻塞this.wait();}number++;System.out.println(Thread.currentThread().getName() + " 生产消息...");this.notifyAll();}catch (Exception e) {e.printStackTrace();}}public synchronized void consume () throws Exception {try {while (number == 0) {//阻塞this.wait();}number--;System.out.println(Thread.currentThread().getName() + " 消费消息...");this.notifyAll();}catch (Exception e) {e.printStackTrace();}}
}public static void test6(){XiaoFeiSyn fei&#61;new XiaoFeiSyn();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {fei.product();}}catch (Exception e) {e.printStackTrace();}finally {}}, "AA").start();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {fei.consume();}}catch (Exception e) {e.printStackTrace();}finally {}}, "BB").start();}//运行结果&#xff1a;AA 生产消息...BB 消费消息...AA 生产消息...BB 消费消息...AA 生产消息...BB 消费消息...AA 生产消息...BB 消费消息...AA 生产消息...BB 消费消息...
分析可知&#xff1a;两个线程可交替执行
版本2&#xff1a;Lock ,Condition ,await,signalAll 配合
class XiaoFei{private int number&#61;0;private Lock lock&#61;new ReentrantLock();private Condition condition&#61;lock.newCondition();public void product() throws Exception{try{lock.lock();while (number!&#61;0){//阻塞condition.await();}number&#43;&#43;;System.out.println(Thread.currentThread().getName()&#43;" 生产消息...");condition.signalAll();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}//XiaoFei fei&#61;new XiaoFei();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {fei.product();}}catch (Exception e) {e.printStackTrace();}finally {}}, "AA").start();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {fei.consume();}}catch (Exception e) {e.printStackTrace();}finally {}}, "BB").start();
分析可知&#xff1a;两个线程可交替执行
版本3&#xff1a;基于阻塞队列实现&#xff0c;BlockingQueue
class QueueProAndConsume{public volatile boolean flag&#61;true;BlockingQueue<String> bq&#61;null;AtomicInteger atmoic&#61;new AtomicInteger();public QueueProAndConsume(BlockingQueue<String> bq) {this.bq &#61; bq;System.out.println("实现类&#xff1a;"&#43;bq.getClass().getName());}public void product() throws Exception{String data&#61;null;while (flag){//生产data&#61;atmoic.getAndIncrement()&#43;"";boolean pro&#61;bq.offer(data,2L,TimeUnit.SECONDS);if(pro){System.out.println(Thread.currentThread().getName()&#43;" 生产成功。。"&#43;data);}else {System.out.println(Thread.currentThread().getName()&#43;" 生产失败。。"&#43;data);}}}public void consume() throws Exception{String data&#61;null;while (flag){//消费String consume&#61;bq.poll(2L,TimeUnit.SECONDS);if(consume&#61;&#61;null){flag&#61;false;System.out.println(Thread.currentThread().getName()&#43;" 超过2秒没有取到消息-");}TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()&#43;" 消费&#xff1a;"&#43;consume);}}
QueueProAndConsume queue&#61;new QueueProAndConsume(new SynchronousQueue<>());new Thread(()->{try {queue.product();}catch (Exception e) {e.printStackTrace();}},"product-").start();new Thread(()->{try {queue.consume();}catch (Exception e) {e.printStackTrace();}},"consume-").start();TimeUnit.SECONDS.sleep(10);queue.flag&#61;false;//运行结果&#xff1a;实现类&#xff1a;java.util.concurrent.SynchronousQueueproduct- 生产成功。。0consume- 消费&#xff1a;0product- 生产成功。。1consume- 消费&#xff1a;1product- 生产成功。。2consume- 消费&#xff1a;2product- 生产成功。。3consume- 消费&#xff1a;3product- 生产失败。。4
分析可知&#xff1a;BlockingQueue 没加任何锁、wait 唤醒。也实现了线程的唤醒和阻塞
1、使用了资源类 2、while 防止虚假唤醒 3、volatile boolean flag&#61;true; 线程间可见
为啥呢 &#xff1f;
通过上面代码认识了阻塞队列&#xff0c;接着进一步熟悉阻塞API
SynchronousQueue 队列&#xff0c;无容量、每次put之前必须等待上一个消费掉&#xff0c;
一种阻塞队列&#xff0c;其中每个插入操作必须等待另一个线程的对应移除操作 &#xff0c;反之亦然。同步队列没有任何内部容量&#xff0c;甚至连一个队列的容量都没有。不能在同步队列上进行 peek&#xff0c;因为仅在试图要移除元素时&#xff0c;该元素才存在&#xff1b;除非另一个线程试图移除某个元素&#xff0c;否则也不能&#xff08;使用任何方法&#xff09;插入元素&#xff1b;也不能迭代队列&#xff0c;因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素&#xff1b;如果没有这样的已排队线程&#xff0c;则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法&#xff08;例如 contains&#xff09;&#xff0c;SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。 public static void blockTest(){BlockingQueue<String> blockingDeque&#61;new SynchronousQueue<>();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {blockingDeque.put("a");System.out.println(Thread.currentThread().getName()&#43;"put...."&#43;i);}}catch (Exception e) {e.printStackTrace();}finally {}}, "t1").start();new Thread(()->{try {for (int i &#61; 0; i < 10; i&#43;&#43;) {blockingDeque.take();TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()&#43;"get...."&#43;i);}}catch (Exception e) {e.printStackTrace();}finally {}}, "t2").start();}//运行结果&#xff1a;t1 -put....0t2 -get....0t1 -put....1t2 -get....1t1 -put....2
public static void test4() throws Exception{BlockingQueue<String> blockingDeque&#61;new ArrayBlockingQueue<>(3);blockingDeque.add("a");blockingDeque.remove("a");blockingDeque.offer("a");blockingDeque.offer("a",2,TimeUnit.SECONDS);blockingDeque.poll();//&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;blockingDeque.put("a");blockingDeque.take();}
当然阻塞队列除了可用于生产者—消费者&#xff0c;还可用于线程池&#xff0c;消息中间件。。