热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

AQS之BlockingQueue源码解析

目录BlockingQueue介绍队列类型常见的5种阻塞队列BlockingQueue中APIBlockingQueue源码解析ArrayBlockingQueue简单

目录

BlockingQueue介绍

队列类型

常见的5种阻塞队列

BlockingQueue中API

BlockingQueue源码解析

ArrayBlockingQueue简单图解




BlockingQueue介绍

BlockingQueue 是一个先进先出(FIFO)的阻塞队列。

BlockingQueue,java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制。

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于数据的入队,出队的原理,却是大同小异。

 


队列类型

1. 无限队列 (unbounded queue ) - 几乎可以无限增长

2. 有限队列 ( bounded queue ) - 定义了最大容量

通常用链表或者数组实现

一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级

队列主要操作:入队(enqueue)与出队(dequeue)

 


5

ArrayBlockingQueue 由数组支持的有界队列

LinkedBlockingQueue 由链接节点支持的可选有界队列

PriorityBlockingQueue 由优先级堆支持的无界优先级队列

DelayQueue 由优先级堆支持的、基于时间的调度队列

SynchronousQueue 没有容量,不存储元素的阻塞队列,也即单个元素的队列

 


BlockingQueue中API

offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。

offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.

add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。

put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。

take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。

poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。

remainingCapacity():获取队列中剩余的空间。

remove(Object o): 从队列中移除指定的值。

contains(Object o): 判断队列中是否拥有该值。

drainTo(Collection c): 将队列中值,全部移除,并发设置到给定的集合中。

 


BlockingQueue源码解析

下面的源码以ArrayBlockingQueue为例。ArrayBlockingQueue是一个典型的生产者-消费者的模式。带着生产者-消费者的思想去阅读源码

参数解析:

//保存数据的数组,可以看出队列的底层一个数组的数据结构
final Object[] items;//下一次获取元素的数组下标
int takeIndex;//下一次添加元素的数组下标
int putIndex;//队列中元素的数量
int count;//可重入锁
final ReentrantLock lock;//队列不为空的信号量
private final Condition notEmpty;//队列不满的信号量
private final Condition notFull;//当前迭代器的共享状态,如果已知不存在任何迭代器,则为null。允许队列操作更新迭代器状态。
transient Itrs itrs = null;

//构造器
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <&#61; 0)throw new IllegalArgumentException();//初始化数组this.items &#61; new Object[capacity];//创建非公平锁lock &#61; new ReentrantLock(fair);//初始化不为空的条件锁notEmpty &#61; lock.newCondition();//初始化不满的条件锁notFull &#61; lock.newCondition();
}

//实例化一个ArrayBlockingQueue
static BlockingQueue queue &#61; new ArrayBlockingQueue(16);
//其他业务代码
//往队列中添加数据
queue.put(data);

将添加数据的操作看成一个生产者&#xff0c;往队列中添加数据的常用的2种方式&#xff0c;offer和put

//往队列插入一条数据&#xff0c;成功返回true,否则返回false&#xff0c;其实add()最终调的也是offer()
public boolean offer(E e) {//校验参数是否为nullcheckNotNull(e);//开始加锁final ReentrantLock lock &#61; this.lock;lock.lock();try {//如果队列里的元素已经放满了&#xff0c;返回falseif (count &#61;&#61; items.length)return false;else {//队列还未满&#xff0c;元素入队enqueue(e);return true;}} finally {lock.unlock();}
}

//往队列插入一条数据&#xff0c;如果队列满了&#xff0c;就进行阻塞&#xff0c;等到队列有空间的时候再入队
public void put(E e) throws InterruptedException {//校验参数是否为nullcheckNotNull(e);final ReentrantLock lock &#61; this.lock;lock.lockInterruptibly();try {while (count &#61;&#61; items.length)//如果队列里的元素已经放满了&#xff0c;则进行阻塞&#xff0c;最终会调用LockSupport.park(this); 进行阻塞&#xff0c;然后等待元素出队时进行唤醒notFull.await();//队列还未满&#xff0c;元素入队enqueue(e);} finally {lock.unlock();}
}

入队操作&#xff1a;

//元素入队
private void enqueue(E x) {// assert lock.getHoldCount() &#61;&#61; 1;// assert items[putIndex] &#61;&#61; null;final Object[] items &#61; this.items;//将参数赋值&#xff0c;这个不知道怎么解释&#xff0c;根据图来理解吧items[putIndex] &#61; x;if (&#43;&#43;putIndex &#61;&#61; items.length)//如果队列已经满了&#xff0c;那么下一个参数就是从头&#xff08;数组下标为0的&#xff09;开始入队putIndex &#61; 0;//统计队列中元素的数量count&#43;&#43;;//一旦有数据入队&#xff0c;说明队列不为空&#xff0c;即通知消费者去从队列中拿数据&#xff0c;标识队列不为空的信号量notEmpty.signal();
}

public final void signal() {//判断当前线程是不是获取锁线程if (!isHeldExclusively())throw new IllegalMonitorStateException();//从条件队列中获取第一个节点Node first &#61; firstWaiter;if (first !&#61; null)//第一个节点不为空时&#xff0c;去唤醒第二个节点中的线程&#xff08;此时这个线程应该是消费者&#xff09;&#xff0c;最终会调用LockSupport.unpark(node.thread);//为什么是取去唤醒第二个节点中的线程&#xff1f;因为真实存放线程的是从第二个节点开始的&#xff0c;详情可见ReentrantLock的源码解析doSignal(first);
}

此时数据已经入队成功&#xff0c;将获取数据看成一个消费者从队列中获取数据&#xff0c;获取数据常用的2种方式&#xff0c;poll和take

//直接从队列头中去取&#xff0c;有就取出来&#xff0c;并且删除队列中的元素&#xff0c;队列中没有元素就返回null
public E poll() {final ReentrantLock lock &#61; this.lock;lock.lock();try {//有元素的话&#xff0c;直接从队列头中去取return (count &#61;&#61; 0) ? null : dequeue();} finally {lock.unlock();}
}

//从队列头中去取元素&#xff0c;有就取出来&#xff0c;并且删除队列中的元素&#xff0c;队列中没有元素则阻塞等待&#xff0c;一直等到有元素就取出来
public E take() throws InterruptedException {final ReentrantLock lock &#61; this.lock;lock.lockInterruptibly();try {while (count &#61;&#61; 0)//队列中没有元素则阻塞等待,最终会调用LockSupport.park(this); 进行阻塞&#xff0c;然后等待元素入队时进行唤醒notEmpty.await();//有元素的话&#xff0c;直接从队列头中去取return dequeue();} finally {lock.unlock();}
}

元素出队操作&#xff1a;

//取队列元素
private E dequeue() {// assert lock.getHoldCount() &#61;&#61; 1;// assert items[takeIndex] !&#61; null;final Object[] items &#61; this.items;&#64;SuppressWarnings("unchecked")//获取到获取元素的指针的那个数据E x &#61; (E) items[takeIndex];//并且将该数据从队列中删除items[takeIndex] &#61; null;if (&#43;&#43;takeIndex &#61;&#61; items.length)//如果该元素已经是队列的最后一个元素了&#xff0c;那么下一个参数就是从头&#xff08;数组下标为0的&#xff09;开始获取takeIndex &#61; 0;//队列元素减一count--;if (itrs !&#61; null)itrs.elementDequeued();//一旦有数据出队&#xff0c;说明队列不满&#xff0c;即通知生产者去往队列中补充数据&#xff0c;(标识队列不满的信号量)notFull.signal();return x;
}

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();//从条件队列中获取第一个节点Node first &#61; firstWaiter;if (first !&#61; null)//第一个节点不为空时&#xff0c;去唤醒第二个节点中的线程&#xff08;此时这个线程应该是生产者&#xff09;&#xff0c;最终会调用LockSupport.unpark(node.thread);//为什么是取去唤醒第二个节点中的线程&#xff1f;因为真实存放线程的是从第二个节点开始的&#xff0c;详情可见ReentrantLock的源码解析doSignal(first);
}

至此&#xff0c;一个元素从生产者入队列&#xff0c;再到消费者出队列的一个完整的流程就走完了。源码解析中的 doSignal(first); 这块代码的解析我没有给贴出来&#xff0c;因为这里面的逻辑比较绕&#xff0c;是将一个Node节点在CLH队列和条件队列相互转移&#xff0c;比较复杂&#xff0c;有些地方我一时还表达不好&#xff0c;所以就不详细说明了&#xff0c;这里面的逻辑大家可以自己跟进去看一下。

 


ArrayBlockingQueue简单图解

 

 

 

 


推荐阅读
  • 在处理高并发场景时,确保业务逻辑的正确性是关键。本文深入探讨了Java原生锁机制的多种细粒度实现方法,旨在通过使用数据的时间戳、ID等关键字段进行锁定,以最小化对系统性能的影响。文章详细分析了不同锁策略的优缺点,并提供了实际应用中的最佳实践,帮助开发者在高并发环境下高效地实现锁机制。 ... [详细]
  • 本书详细介绍了在最新Linux 4.0内核环境下进行Java与Linux设备驱动开发的全面指南。内容涵盖设备驱动的基本概念、开发环境的搭建、操作系统对设备驱动的影响以及具体开发步骤和技巧。通过丰富的实例和深入的技术解析,帮助读者掌握设备驱动开发的核心技术和最佳实践。 ... [详细]
  • 【并发编程】全面解析 Java 内存模型,一篇文章带你彻底掌握
    本文深入解析了 Java 内存模型(JMM),从基础概念到高级特性进行全面讲解,帮助读者彻底掌握 JMM 的核心原理和应用技巧。通过详细分析内存可见性、原子性和有序性等问题,结合实际代码示例,使开发者能够更好地理解和优化多线程并发程序。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 本文深入探讨了在Android应用开发中常见的相机连接故障问题,特别是在RK3288平台和Android 6.0系统上。通过分析具体案例,本文提供了详细的解决方案和应对策略,旨在帮助开发者有效解决相机连接问题,提升应用的稳定性和用户体验。 ... [详细]
  • Node.js 教程第五讲:深入解析 EventEmitter(事件监听与发射机制)
    本文将深入探讨 Node.js 中的 EventEmitter 模块,详细介绍其在事件监听与发射机制中的应用。内容涵盖事件驱动的基本概念、如何在 Node.js 中注册和触发自定义事件,以及 EventEmitter 的核心 API 和使用方法。通过本教程,读者将能够全面理解并熟练运用 EventEmitter 进行高效的事件处理。 ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • 结语 | 《探索二进制世界:软件安全与逆向分析》读书笔记:深入理解二进制代码的逆向工程方法
    结语 | 《探索二进制世界:软件安全与逆向分析》读书笔记:深入理解二进制代码的逆向工程方法 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 在处理大规模并发请求时,传统的多线程或多进程模型往往无法有效解决性能瓶颈问题。尽管它们在处理小规模任务时能提升效率,但在高并发场景下,系统资源的过度消耗和上下文切换的开销会显著降低整体性能。相比之下,Python 的 `asyncio` 模块通过协程提供了一种轻量级且高效的并发解决方案。本文将深入解析 `asyncio` 模块的原理及其在实际应用中的优化技巧,帮助开发者更好地利用协程技术提升程序性能。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
  • 在 Linux 系统中,`/proc` 目录实现了一种特殊的文件系统,称为 proc 文件系统。与传统的文件系统不同,proc 文件系统主要用于提供内核和进程信息的动态视图,通过文件和目录的形式呈现。这些信息包括系统状态、进程细节以及各种内核参数,为系统管理员和开发者提供了强大的诊断和调试工具。此外,proc 文件系统还支持实时读取和修改某些内核参数,增强了系统的灵活性和可配置性。 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
author-avatar
手机用户2502931035
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有