热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【源码篇】LinkedBlockingQueue源码超详细解读

文章目录⭐️导读🚩基本构造📖核心源码解读put方法polltake方法drainTo方法remove方法⚠️注意事项🚗应用场景⭐️导


文章目录

    • ⭐️导读
    • 🚩基本构造
    • 📖核心源码解读
      • `put`方法
      • `poll/take`方法
      • `drainTo`方法
      • `remove`方法
    • ⚠️注意事项
    • 🚗应用场景


⭐️导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。


🚩基本构造

image-20220421143744032

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常
boolean add(E e);
// 插入,失败返回false
boolean offer(E e);
// 移除队列头部元素,队列为空异常
E remove();
// 移除队列头部元素,队列为空返回null
E poll();
// 查看头部元素,队列为空异常
E element();
// 查看头部元素,队列为空返回null
E peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待
void put(E e);
// 将元素插入队列,如果队列没用可用空间则等待设定的时间
boolean offer(E e, long timeout, TimeUnit unit);
// 移除头部元素,如果没有可用则等待
E take();
// 移除头部元素,如果没有可用则等待设定的时间
E poll(long timeout, TimeUnit unit);
// 返回剩余可插入的元素数量
int remainingCapacity();
// 从队列中取出全部的元素并插入到指定集合中
int drainTo(Collection<? super E> c);
// 从队列中取出指定数量的元素并插入到指定集合中
int drainTo(Collection<? super E> c, int maxElements);

&#x1f4d6;核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition来控制他们的执行过程

// 读锁
private final ReentrantLock takeLock &#61; new ReentrantLock();
// 队列不为空的Condition
private final Condition notEmpty &#61; takeLock.newCondition();
// 写锁
private final ReentrantLock putLock &#61; new ReentrantLock();
// 队列没有满的Condition
private final Condition notFull &#61; putLock.newCondition();

put方法

将元素插入队列&#xff0c;如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {// 如果元素是null抛出异常if (e &#61;&#61; null) throw new NullPointerException();int c &#61; -1;Node<E> node &#61; new Node<E>(e);// 使用写锁final ReentrantLock putLock &#61; this.putLock;// 元素数量计数器final AtomicInteger count &#61; this.count;// 获得锁putLock.lockInterruptibly();try {// 判断队列是否已满,如果已满写线程等待while (count.get() &#61;&#61; capacity) {notFull.await();}// 将尾部元素指向当前元素enqueue(node);// 元素数量&#43;1并返回操作前的数量c &#61; count.getAndIncrement();// 如果元素数量没有满&#xff0c;则唤醒notFull.wait()&#xff0c;表示当前队列未满if (c &#43; 1 < capacity)notFull.signal();} finally {// 解锁putLock.unlock();}if (c &#61;&#61; 0)// 如果操作前元素数量为0&#xff0c;则通知写线程signalNotEmpty();
}

此处signalNotEmpty();就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费


poll/take方法

poll 查看头部元素&#xff0c;队列为空异常

take 移除并返回头部元素&#xff0c;如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x &#61; null;int c &#61; -1;// 等待纳秒数long nanos &#61; unit.toNanos(timeout);final AtomicInteger count &#61; this.count;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;// 加锁takeLock.lockInterruptibly();try {// 如果当前队列中没有元素则等待指定时长while (count.get() &#61;&#61; 0) {if (nanos <&#61; 0)// 等待超时&#xff0c;直接返回nullreturn null;nanos &#61; notEmpty.awaitNanos(nanos);}// 移除队列头中的节点并返回x &#61; dequeue();// 元素-1c &#61; count.getAndDecrement();// 如果队列中有数据&#xff0c;则通知其他线程该队列不为空if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 通知其他线程表示该队列未满if (c &#61;&#61; capacity)signalNotFull();return x;
}

此处signalNotFull();是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入

take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下

public E take() throws InterruptedException {E x;int c &#61; -1;final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {// 等待逻辑与poll不一样&#xff0c;此处表示如果没有数据则一直等待while (count.get() &#61;&#61; 0) {notEmpty.await();}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {if (c &#61;&#61; null)throw new NullPointerException();if (c &#61;&#61; this)throw new IllegalArgumentException();if (maxElements <&#61; 0)return 0;boolean signalNotFull &#61; false;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;takeLock.lock();try {// 读取数量不能大于剩余元素数量int n &#61; Math.min(maxElements, count.get());// 从头部开始读取&#xff0c;h表示当前头节点Node<E> h &#61; head;int i &#61; 0;try {while (i < n) {Node<E> p &#61; h.next;c.add(p.item);p.item &#61; null;h.next &#61; h;h &#61; p;&#43;&#43;i;}return n;} finally {if (i > 0) {// 有读取出来元素&#xff0c;则更新最新头节点head &#61; h;// 队列是否还有空间signalNotFull &#61; (count.getAndAdd(-i) &#61;&#61; capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}
}

remove方法

public boolean remove(Object o) {if (o &#61;&#61; null) return false;// 加锁fullyLock();try {// 遍历全部元素for (Node<E> trail &#61; head, p &#61; trail.next;p !&#61; null;trail &#61; p, p &#61; p.next) {if (o.equals(p.item)) {// 移除链接unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

注意一下此处的加锁逻辑

void fullyLock() {putLock.lock();takeLock.lock();
}

可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁

一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。


⚠️注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如&#xff1a;remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)&#43;悲观锁&#xff0c;如非必要最好不要使用


&#x1f697;应用场景

LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。


  1. 调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用drainTo归集然后统一请求。
  2. 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控

ResourcePool 对象池抽象类&#xff0c;实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {private final LinkedBlockingQueue<T> queue;public ResourcePool(int poolMax) {queue &#61; new LinkedBlockingQueue<>(poolMax);for (int i &#61; 0; i < poolMax; i&#43;&#43;) {T model &#61; createResource();model.pool &#61; this;model.invalid &#61; true;queue.add(model);}}public T getResource() {try {do {T t &#61; queue.take();if (t.invalid) {t.invalid &#61; false;return open(t);}} while (true);} catch (InterruptedException e) {throw new RuntimeException(e);}}protected T open(T t) {return t;}protected abstract T createResource();public void free(T t) {if (!t.invalid) {t.invalid &#61; true;queue.offer(close(t));}}protected T close(T t) {return t;}}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {ResourcePool pool;boolean invalid;&#64;Overridepublic void close() throws IOException {pool.free(this);}
}

TestModel对象实例

&#64;Setter
public class TestModel extends ResourceModel {public TestModel(String name, int age) {this.name &#61; name;this.age &#61; age;}private String name;private int age;}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {public TestPool(int poolMax) {super(poolMax);}// 创建对象实例&#64;Overrideprotected TestModel createResource() {return new TestModel("", 0);}// 获得对象的前置操作&#64;Overrideprotected TestModel open(TestModel testModel) {return super.open(testModel);}// 对象回收后操作&#64;Overrideprotected TestModel close(TestModel testModel) {testModel.setAge(0);testModel.setName("");return super.close(testModel);}
}

使用方式1

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);// 从池中获得一个对象TestModel model &#61; testPool.getResource();// 回收对象model.close();
}

使用方式2

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);try(TestModel model &#61; testPool.getResource()) {}
}

推荐阅读
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Java 中的 BigDecimal pow()方法,示例 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 深入解析JVM垃圾收集器
    本文基于《深入理解Java虚拟机:JVM高级特性与最佳实践》第二版,详细探讨了JVM中不同类型的垃圾收集器及其工作原理。通过介绍各种垃圾收集器的特性和应用场景,帮助读者更好地理解和优化JVM内存管理。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • 技术分享:从动态网站提取站点密钥的解决方案
    本文探讨了如何从动态网站中提取站点密钥,特别是针对验证码(reCAPTCHA)的处理方法。通过结合Selenium和requests库,提供了详细的代码示例和优化建议。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • Android 渐变圆环加载控件实现
    本文介绍了如何在 Android 中创建一个自定义的渐变圆环加载控件,该控件已在多个知名应用中使用。我们将详细探讨其工作原理和实现方法。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
author-avatar
上海悠u7_
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有