热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【源码篇】LinkedBlockingQueue源码超详细解读

文章目录⭐️导读🚩基本构造📖核心源码解读put方法polltake方法drainTo方法remove方法⚠️注意事项🚗应用场景⭐️导


文章目录

    • ⭐️导读
    • 🚩基本构造
    • 📖核心源码解读
      • `put`方法
      • `poll/take`方法
      • `drainTo`方法
      • `remove`方法
    • ⚠️注意事项
    • 🚗应用场景


⭐️导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。


🚩基本构造

image-20220421143744032

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常
boolean add(E e);
// 插入,失败返回false
boolean offer(E e);
// 移除队列头部元素,队列为空异常
E remove();
// 移除队列头部元素,队列为空返回null
E poll();
// 查看头部元素,队列为空异常
E element();
// 查看头部元素,队列为空返回null
E peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待
void put(E e);
// 将元素插入队列,如果队列没用可用空间则等待设定的时间
boolean offer(E e, long timeout, TimeUnit unit);
// 移除头部元素,如果没有可用则等待
E take();
// 移除头部元素,如果没有可用则等待设定的时间
E poll(long timeout, TimeUnit unit);
// 返回剩余可插入的元素数量
int remainingCapacity();
// 从队列中取出全部的元素并插入到指定集合中
int drainTo(Collection<? super E> c);
// 从队列中取出指定数量的元素并插入到指定集合中
int drainTo(Collection<? super E> c, int maxElements);

&#x1f4d6;核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition来控制他们的执行过程

// 读锁
private final ReentrantLock takeLock &#61; new ReentrantLock();
// 队列不为空的Condition
private final Condition notEmpty &#61; takeLock.newCondition();
// 写锁
private final ReentrantLock putLock &#61; new ReentrantLock();
// 队列没有满的Condition
private final Condition notFull &#61; putLock.newCondition();

put方法

将元素插入队列&#xff0c;如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {// 如果元素是null抛出异常if (e &#61;&#61; null) throw new NullPointerException();int c &#61; -1;Node<E> node &#61; new Node<E>(e);// 使用写锁final ReentrantLock putLock &#61; this.putLock;// 元素数量计数器final AtomicInteger count &#61; this.count;// 获得锁putLock.lockInterruptibly();try {// 判断队列是否已满,如果已满写线程等待while (count.get() &#61;&#61; capacity) {notFull.await();}// 将尾部元素指向当前元素enqueue(node);// 元素数量&#43;1并返回操作前的数量c &#61; count.getAndIncrement();// 如果元素数量没有满&#xff0c;则唤醒notFull.wait()&#xff0c;表示当前队列未满if (c &#43; 1 < capacity)notFull.signal();} finally {// 解锁putLock.unlock();}if (c &#61;&#61; 0)// 如果操作前元素数量为0&#xff0c;则通知写线程signalNotEmpty();
}

此处signalNotEmpty();就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费


poll/take方法

poll 查看头部元素&#xff0c;队列为空异常

take 移除并返回头部元素&#xff0c;如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x &#61; null;int c &#61; -1;// 等待纳秒数long nanos &#61; unit.toNanos(timeout);final AtomicInteger count &#61; this.count;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;// 加锁takeLock.lockInterruptibly();try {// 如果当前队列中没有元素则等待指定时长while (count.get() &#61;&#61; 0) {if (nanos <&#61; 0)// 等待超时&#xff0c;直接返回nullreturn null;nanos &#61; notEmpty.awaitNanos(nanos);}// 移除队列头中的节点并返回x &#61; dequeue();// 元素-1c &#61; count.getAndDecrement();// 如果队列中有数据&#xff0c;则通知其他线程该队列不为空if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 通知其他线程表示该队列未满if (c &#61;&#61; capacity)signalNotFull();return x;
}

此处signalNotFull();是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入

take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下

public E take() throws InterruptedException {E x;int c &#61; -1;final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {// 等待逻辑与poll不一样&#xff0c;此处表示如果没有数据则一直等待while (count.get() &#61;&#61; 0) {notEmpty.await();}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {if (c &#61;&#61; null)throw new NullPointerException();if (c &#61;&#61; this)throw new IllegalArgumentException();if (maxElements <&#61; 0)return 0;boolean signalNotFull &#61; false;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;takeLock.lock();try {// 读取数量不能大于剩余元素数量int n &#61; Math.min(maxElements, count.get());// 从头部开始读取&#xff0c;h表示当前头节点Node<E> h &#61; head;int i &#61; 0;try {while (i < n) {Node<E> p &#61; h.next;c.add(p.item);p.item &#61; null;h.next &#61; h;h &#61; p;&#43;&#43;i;}return n;} finally {if (i > 0) {// 有读取出来元素&#xff0c;则更新最新头节点head &#61; h;// 队列是否还有空间signalNotFull &#61; (count.getAndAdd(-i) &#61;&#61; capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}
}

remove方法

public boolean remove(Object o) {if (o &#61;&#61; null) return false;// 加锁fullyLock();try {// 遍历全部元素for (Node<E> trail &#61; head, p &#61; trail.next;p !&#61; null;trail &#61; p, p &#61; p.next) {if (o.equals(p.item)) {// 移除链接unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

注意一下此处的加锁逻辑

void fullyLock() {putLock.lock();takeLock.lock();
}

可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁

一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。


⚠️注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如&#xff1a;remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)&#43;悲观锁&#xff0c;如非必要最好不要使用


&#x1f697;应用场景

LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。


  1. 调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用drainTo归集然后统一请求。
  2. 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控

ResourcePool 对象池抽象类&#xff0c;实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {private final LinkedBlockingQueue<T> queue;public ResourcePool(int poolMax) {queue &#61; new LinkedBlockingQueue<>(poolMax);for (int i &#61; 0; i < poolMax; i&#43;&#43;) {T model &#61; createResource();model.pool &#61; this;model.invalid &#61; true;queue.add(model);}}public T getResource() {try {do {T t &#61; queue.take();if (t.invalid) {t.invalid &#61; false;return open(t);}} while (true);} catch (InterruptedException e) {throw new RuntimeException(e);}}protected T open(T t) {return t;}protected abstract T createResource();public void free(T t) {if (!t.invalid) {t.invalid &#61; true;queue.offer(close(t));}}protected T close(T t) {return t;}}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {ResourcePool pool;boolean invalid;&#64;Overridepublic void close() throws IOException {pool.free(this);}
}

TestModel对象实例

&#64;Setter
public class TestModel extends ResourceModel {public TestModel(String name, int age) {this.name &#61; name;this.age &#61; age;}private String name;private int age;}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {public TestPool(int poolMax) {super(poolMax);}// 创建对象实例&#64;Overrideprotected TestModel createResource() {return new TestModel("", 0);}// 获得对象的前置操作&#64;Overrideprotected TestModel open(TestModel testModel) {return super.open(testModel);}// 对象回收后操作&#64;Overrideprotected TestModel close(TestModel testModel) {testModel.setAge(0);testModel.setName("");return super.close(testModel);}
}

使用方式1

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);// 从池中获得一个对象TestModel model &#61; testPool.getResource();// 回收对象model.close();
}

使用方式2

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);try(TestModel model &#61; testPool.getResource()) {}
}

推荐阅读
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • Kubernetes 持久化存储与数据卷详解
    本文深入探讨 Kubernetes 中持久化存储的使用场景、PV/PVC/StorageClass 的基本操作及其实现原理,旨在帮助读者理解如何高效管理容器化应用的数据持久化需求。 ... [详细]
  • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
    本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
  • 在高并发需求的C++项目中,我们最初选择了JsonCpp进行JSON解析和序列化。然而,在处理大数据量时,JsonCpp频繁抛出异常,尤其是在多线程环境下问题更为突出。通过分析发现,旧版本的JsonCpp存在多线程安全性和性能瓶颈。经过评估,我们最终选择了RapidJSON作为替代方案,并实现了显著的性能提升。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 优化局域网SSH连接延迟问题的解决方案
    本文介绍了解决局域网内SSH连接到服务器时出现长时间等待问题的方法。通过调整配置和优化网络设置,可以显著缩短SSH连接的时间。 ... [详细]
  • 深入理解Redis的数据结构与对象系统
    本文详细探讨了Redis中的数据结构和对象系统的实现,包括字符串、列表、集合、哈希表和有序集合等五种核心对象类型,以及它们所使用的底层数据结构。通过分析源码和相关文献,帮助读者更好地理解Redis的设计原理。 ... [详细]
  • 作为一名 Ember.js 新手,了解如何在路由和模型中正确加载 JSON 数据是至关重要的。本文将探讨两者之间的差异,并提供实用的建议。 ... [详细]
  • 开发笔记:9.八大排序
    开发笔记:9.八大排序 ... [详细]
  • 异常要理解Java异常处理是如何工作的,需要掌握一下三种异常类型:检查性异常:最具代表性的检查性异常是用户错误或问题引起的异常ÿ ... [详细]
  • 本题探讨了在大数据结构背景下,如何通过整体二分和CDQ分治等高级算法优化处理复杂的时间序列问题。题目设定包括节点数量、查询次数和权重限制,并详细分析了解决方案中的关键步骤。 ... [详细]
  • 本文介绍了如何在多线程环境中实现异步任务的事务控制,确保任务执行的一致性和可靠性。通过使用计数器和异常标记字段,系统能够准确判断所有异步线程的执行结果,并根据结果决定是否回滚或提交事务。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • Spring Boot单元测试中Redis连接失败的解决方案
    本文探讨了在Spring Boot项目中进行单元测试时遇到Redis连接问题的原因及解决方法,详细分析了配置文件加载路径不当导致的问题,并提供了有效的解决方案。 ... [详细]
  • 主板IO用W83627THG,用VC如何取得CPU温度,系统温度,CPU风扇转速,VBat的电压. ... [详细]
author-avatar
上海悠u7_
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有