热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【源码篇】LinkedBlockingQueue源码超详细解读

文章目录⭐️导读🚩基本构造📖核心源码解读put方法polltake方法drainTo方法remove方法⚠️注意事项🚗应用场景⭐️导


文章目录

    • ⭐️导读
    • 🚩基本构造
    • 📖核心源码解读
      • `put`方法
      • `poll/take`方法
      • `drainTo`方法
      • `remove`方法
    • ⚠️注意事项
    • 🚗应用场景


⭐️导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。


🚩基本构造

image-20220421143744032

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常
boolean add(E e);
// 插入,失败返回false
boolean offer(E e);
// 移除队列头部元素,队列为空异常
E remove();
// 移除队列头部元素,队列为空返回null
E poll();
// 查看头部元素,队列为空异常
E element();
// 查看头部元素,队列为空返回null
E peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待
void put(E e);
// 将元素插入队列,如果队列没用可用空间则等待设定的时间
boolean offer(E e, long timeout, TimeUnit unit);
// 移除头部元素,如果没有可用则等待
E take();
// 移除头部元素,如果没有可用则等待设定的时间
E poll(long timeout, TimeUnit unit);
// 返回剩余可插入的元素数量
int remainingCapacity();
// 从队列中取出全部的元素并插入到指定集合中
int drainTo(Collection<? super E> c);
// 从队列中取出指定数量的元素并插入到指定集合中
int drainTo(Collection<? super E> c, int maxElements);

&#x1f4d6;核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition来控制他们的执行过程

// 读锁
private final ReentrantLock takeLock &#61; new ReentrantLock();
// 队列不为空的Condition
private final Condition notEmpty &#61; takeLock.newCondition();
// 写锁
private final ReentrantLock putLock &#61; new ReentrantLock();
// 队列没有满的Condition
private final Condition notFull &#61; putLock.newCondition();

put方法

将元素插入队列&#xff0c;如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {// 如果元素是null抛出异常if (e &#61;&#61; null) throw new NullPointerException();int c &#61; -1;Node<E> node &#61; new Node<E>(e);// 使用写锁final ReentrantLock putLock &#61; this.putLock;// 元素数量计数器final AtomicInteger count &#61; this.count;// 获得锁putLock.lockInterruptibly();try {// 判断队列是否已满,如果已满写线程等待while (count.get() &#61;&#61; capacity) {notFull.await();}// 将尾部元素指向当前元素enqueue(node);// 元素数量&#43;1并返回操作前的数量c &#61; count.getAndIncrement();// 如果元素数量没有满&#xff0c;则唤醒notFull.wait()&#xff0c;表示当前队列未满if (c &#43; 1 < capacity)notFull.signal();} finally {// 解锁putLock.unlock();}if (c &#61;&#61; 0)// 如果操作前元素数量为0&#xff0c;则通知写线程signalNotEmpty();
}

此处signalNotEmpty();就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费


poll/take方法

poll 查看头部元素&#xff0c;队列为空异常

take 移除并返回头部元素&#xff0c;如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x &#61; null;int c &#61; -1;// 等待纳秒数long nanos &#61; unit.toNanos(timeout);final AtomicInteger count &#61; this.count;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;// 加锁takeLock.lockInterruptibly();try {// 如果当前队列中没有元素则等待指定时长while (count.get() &#61;&#61; 0) {if (nanos <&#61; 0)// 等待超时&#xff0c;直接返回nullreturn null;nanos &#61; notEmpty.awaitNanos(nanos);}// 移除队列头中的节点并返回x &#61; dequeue();// 元素-1c &#61; count.getAndDecrement();// 如果队列中有数据&#xff0c;则通知其他线程该队列不为空if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 通知其他线程表示该队列未满if (c &#61;&#61; capacity)signalNotFull();return x;
}

此处signalNotFull();是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入

take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下

public E take() throws InterruptedException {E x;int c &#61; -1;final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {// 等待逻辑与poll不一样&#xff0c;此处表示如果没有数据则一直等待while (count.get() &#61;&#61; 0) {notEmpty.await();}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {if (c &#61;&#61; null)throw new NullPointerException();if (c &#61;&#61; this)throw new IllegalArgumentException();if (maxElements <&#61; 0)return 0;boolean signalNotFull &#61; false;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;takeLock.lock();try {// 读取数量不能大于剩余元素数量int n &#61; Math.min(maxElements, count.get());// 从头部开始读取&#xff0c;h表示当前头节点Node<E> h &#61; head;int i &#61; 0;try {while (i < n) {Node<E> p &#61; h.next;c.add(p.item);p.item &#61; null;h.next &#61; h;h &#61; p;&#43;&#43;i;}return n;} finally {if (i > 0) {// 有读取出来元素&#xff0c;则更新最新头节点head &#61; h;// 队列是否还有空间signalNotFull &#61; (count.getAndAdd(-i) &#61;&#61; capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}
}

remove方法

public boolean remove(Object o) {if (o &#61;&#61; null) return false;// 加锁fullyLock();try {// 遍历全部元素for (Node<E> trail &#61; head, p &#61; trail.next;p !&#61; null;trail &#61; p, p &#61; p.next) {if (o.equals(p.item)) {// 移除链接unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

注意一下此处的加锁逻辑

void fullyLock() {putLock.lock();takeLock.lock();
}

可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁

一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。


⚠️注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如&#xff1a;remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)&#43;悲观锁&#xff0c;如非必要最好不要使用


&#x1f697;应用场景

LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。


  1. 调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用drainTo归集然后统一请求。
  2. 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控

ResourcePool 对象池抽象类&#xff0c;实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {private final LinkedBlockingQueue<T> queue;public ResourcePool(int poolMax) {queue &#61; new LinkedBlockingQueue<>(poolMax);for (int i &#61; 0; i < poolMax; i&#43;&#43;) {T model &#61; createResource();model.pool &#61; this;model.invalid &#61; true;queue.add(model);}}public T getResource() {try {do {T t &#61; queue.take();if (t.invalid) {t.invalid &#61; false;return open(t);}} while (true);} catch (InterruptedException e) {throw new RuntimeException(e);}}protected T open(T t) {return t;}protected abstract T createResource();public void free(T t) {if (!t.invalid) {t.invalid &#61; true;queue.offer(close(t));}}protected T close(T t) {return t;}}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {ResourcePool pool;boolean invalid;&#64;Overridepublic void close() throws IOException {pool.free(this);}
}

TestModel对象实例

&#64;Setter
public class TestModel extends ResourceModel {public TestModel(String name, int age) {this.name &#61; name;this.age &#61; age;}private String name;private int age;}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {public TestPool(int poolMax) {super(poolMax);}// 创建对象实例&#64;Overrideprotected TestModel createResource() {return new TestModel("", 0);}// 获得对象的前置操作&#64;Overrideprotected TestModel open(TestModel testModel) {return super.open(testModel);}// 对象回收后操作&#64;Overrideprotected TestModel close(TestModel testModel) {testModel.setAge(0);testModel.setName("");return super.close(testModel);}
}

使用方式1

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);// 从池中获得一个对象TestModel model &#61; testPool.getResource();// 回收对象model.close();
}

使用方式2

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);try(TestModel model &#61; testPool.getResource()) {}
}

推荐阅读
  • JUC并发编程——线程的基本方法使用
    目录一、线程名称设置和获取二、线程的sleep()三、线程的interrupt四、join()五、yield()六、wait(),notify(),notifyAll( ... [详细]
  • 前言:由于Android系统本身决定了其自身的单线程模型结构。在日常的开发过程中,我们又不能把所有的工作都交给主线程去处理(会造成UI卡顿现象)。因此,适当的创建子线程去处理一些耗 ... [详细]
  • 在iOS开发中,多线程技术的应用非常广泛,能够高效地执行多个调度任务。本文将重点介绍GCD(Grand Central Dispatch)在多线程开发中的应用,包括其函数和队列的实现细节。 ... [详细]
  • 本文详细介绍了 Java 中 org.w3c.dom.Node 类的 isEqualNode() 方法的功能、参数及返回值,并通过多个实际代码示例来展示其具体应用。此方法用于检测两个节点是否相等,而不仅仅是判断它们是否为同一个对象。 ... [详细]
  • 在Effective Java第三版中,建议在方法返回类型中优先考虑使用Collection而非Stream,以提高代码的灵活性和兼容性。 ... [详细]
  • 本文探讨了在使用JavaMail发送电子邮件时,抄送功能未能正常工作的问题,并提供了详细的代码示例和解决方法。 ... [详细]
  • 本文详细介绍了Java中HashSet的工作原理及其源码分析。HashSet实现了Set接口,内部通过HashMap来存储数据,不保证元素的迭代顺序,且允许null值的存在。文章不仅涵盖了HashSet的基本概念,还深入探讨了其内部实现细节。 ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • 本文介绍了如何将Spring属性占位符与Jersey的@Path和@ApplicationPath注解结合使用,以便在资源路径中动态解析属性值。 ... [详细]
  • 在Java中,每个对象都继承自Object类,并拥有equals、toString等方法。本练习要求定义一个PersonOverride类,并覆盖其toString和equals方法。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • pypy 真的能让 Python 比 C 还快么?
    作者:肖恩顿来源:游戏不存在最近“pypy为什么能让python比c还快”刷屏了,原文讲的内容偏理论,干货比较少。我们可以再深入一点点,了解pypy的真相。正式开始之前,多唠叨两句 ... [详细]
  • 本文介绍了一种通过设置主题(Theme)来实现快速启动的Android引导页,并详细说明了如何避免因不同屏幕分辨率导致的图片拉伸问题。 ... [详细]
  • DirectShow Filter 开发指南
    本文总结了 DirectShow Filter 的开发经验,重点介绍了 Source Filter、In-Place Transform Filter 和 Render Filter 的实现方法。通过使用 DirectShow 提供的类,可以简化 Filter 的开发过程。 ... [详细]
  • 本文将通过一个简单的示例代码,介绍如何在 Java 中获取对象中值为 null 的字段名称。 ... [详细]
author-avatar
上海悠u7_
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有