热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

并发和多线程(二十)–LinkedBlockingQueue源码解析

目录类定义QueueBlockingQueue成员变量构造函数offer()put()take()enqueueanddequeuepeek()阻塞队列在日常开发中直接使用比较少,

目录



  • 类定义

    • Queue

    • BlockingQueue



  • 成员变量

  • 构造函数

  • offer()

  • put()

  • take()

  • enqueue and dequeue

  • peek()


阻塞队列在日常开发中直接使用比较少,但是在很多工具类或者框架中有很多应用,例如线程池,消息队列等。所以,深入了解阻塞队列也是很有必要的。所以这里来了解一下LinkedBlockingQueue的相关源码,从命名可以看出来是由链表实现的数据结构。


类定义

public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
}

从上面可以看到继承Queue,实现BlockingQueue,我们介绍一下两个类里面方法的作用,为了方便记忆和对比放到表格里进行展示。


Queue






























作用方法1方法2区别
新增add()offer()add()队列满的时候抛出异常,offer()队列满的时候返回false
查看并删除remove()poll()remove()队列为空的时候抛出异常,poll()队列为空的时候返回null
查看不删除element()peek()element()队列为空的时候抛出异常,peek()队列为空的时候返回null

BlockingQueue

BlockingQueue顾名思义带有阻塞的队列,方法有所区别,下面方法包含了Queue,因为属于继承关系,下面表格方法名用序号代替。







































作用方法1方法2方法3方法4区别
新增add()offer()put()offer(E e, long timeout, TimeUnit unit)队列满的时候,1和2作用和queue相同,3会一直阻塞,4阻塞一段时间,返回false
查看并删除remove()poll()take()poll(long timeout, TimeUnit unit)队列为空,1和2没有变化,3会一直阻塞,4会阻塞一段时间,返回null
查看不删除element()peek()队列为空,1和2没有变化

成员变量

//链表的容量,默认Integer.MAX_VALUE
private final int capacity;
//当前存在元素数量
private final AtomicInteger count = new AtomicInteger();
//链表的head节点
transient Node head;
//链表的tail节点
private transient Node last;
//主要用于take, poll等方法的加锁
private final ReentrantLock takeLock = new ReentrantLock();
//主要用在取值的阻塞场景
private final Condition notEmpty = takeLock.newCondition();
//主要用于put, offer等方法的加锁
private final ReentrantLock putLock = new ReentrantLock();
//主要用在新增的阻塞场景
private final Condition notFull = putLock.newCondition();

//Node比较简单,一个item,还有指向下个节点,也就是单向链表
static class Node {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node next;
Node(E x) { item = x; }
}

构造函数

//默认队列存储的元素最大为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

//自定义capacity,初始化head、tail
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}

public LinkedBlockingQueue(Collection c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//加锁,因为是添加,肯定是putLock
putLock.lock();
try {
int n = 0;
//遍历集合,每次生成一个node,添加到链表尾部
for (E e : c) {
if (e == null)
throw new NullPointerException();
//每次判断新增的节点是否超过capacity,如果是,抛出异常
if (n == capacity)
throw new IllegalStateException("Queue full");
//将节点添加到队列tail
enqueue(new Node(e));
++n;
}
//设置当前元素个数count
count.set(n);
//finally解锁
} finally {
putLock.unlock();
}
}

offer()

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//初始设置为-1,c <0,表示新增失败
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() enqueue(node);
c = count.getAndIncrement();
if (c + 1 notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

流程:



  1. 如果e为空,抛出异常。

  2. 如果当前队列已满,返回false。

  3. 将e封装成一个新的节点,加锁,putLock。

  4. 再次判断队列元素数量
  5. CAS将count+1,注意这里调用的是getAndIncrement返回的是+1之前的值。如果队列没满,唤醒某个某个因为添加而阻塞的线程。

  6. finally解锁,如果c == 0,加锁takeLock,唤醒继续添加。

  7. 返回 c >= 0。


put()

相对于offer(),put的代码会判断当前队列是否满了,如果满了,通过Condition阻塞,其他没啥区别。

在这里插入图片描述


take()

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//注意这里c是先获取,后-1的
if (c == capacity)
signalNotFull();
return x;
}

流程:



  1. 加锁,takeLock。

  2. 如果当前队列为空,直接通过notEmpty阻塞,等待被唤醒。

  3. 取出第一个元素,并删除元素。

  4. 如果c > 1,表示队列还有元素,唤醒别的线程获取。

  5. finally解锁,如果c == capacity,表示队列没满,加锁takeLock,唤醒继续添加。

  6. 返回 x。


enqueue and dequeue

private void enqueue(Node node) {
last = last.next = node;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

这里统一讲一下在链表中添加和删除数据的流程,特别是dequeue(),我刚看第一眼的时候有点蒙蔽的,下面举个栗子。

BlockingQueue queue = new LinkedBlockingDeque<>();
queue.offer(1);
queue.offer(2);
queue.offer(3);
Integer take = queue.take();
System.out.println(take);

在这里插入图片描述

这里把每一步都画出来了,还是比较好理解的。其余方法的逻辑都比较相似,下面简单说一下。


peek()

peek()和take()的代码差不多,只是不会删除元素,take()通过dequeue(),而peek()通过一句代码Node first = head.next;获得该节点的数据然后返回。



推荐阅读
  • 本文详细介绍了Grand Central Dispatch (GCD) 的核心概念和使用方法,探讨了任务队列、同步与异步执行以及常见的死锁问题。通过具体示例和代码片段,帮助开发者更好地理解和应用GCD进行多线程开发。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 微软Exchange服务器遭遇2022年版“千年虫”漏洞
    微软Exchange服务器在新年伊始遭遇了一个类似于‘千年虫’的日期处理漏洞,导致邮件传输受阻。该问题主要影响配置了FIP-FS恶意软件引擎的Exchange 2016和2019版本。 ... [详细]
  • 作者:守望者1028链接:https:www.nowcoder.comdiscuss55353来源:牛客网面试高频题:校招过程中参考过牛客诸位大佬的面经,但是具体哪一块是参考谁的我 ... [详细]
  • FinOps 与 Serverless 的结合:破解云成本难题
    本文探讨了如何通过 FinOps 实践优化 Serverless 应用的成本管理,提出了首个 Serverless 函数总成本估计模型,并分享了多种有效的成本优化策略。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 深入解析Java多线程与并发库的应用:空中网实习生面试题详解
    本文详细探讨了Java多线程与并发库的高级应用,结合空中网在挑选实习生时的面试题目,深入分析了相关技术要点和实现细节。文章通过具体的代码示例展示了如何使用Semaphore和SynchronousQueue来管理线程同步和任务调度。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
  • Linux系统中Java程序Too Many Open Files问题的深入解析与解决方案
    本文详细分析了在Linux环境下运行的Java应用程序中可能出现的“Too many open files”异常现象,探讨其成因及解决方法。该问题通常出现在高并发文件访问或大量网络连接场景下,对系统性能和稳定性有较大影响。 ... [详细]
  • 在高并发需求的C++项目中,我们最初选择了JsonCpp进行JSON解析和序列化。然而,在处理大数据量时,JsonCpp频繁抛出异常,尤其是在多线程环境下问题更为突出。通过分析发现,旧版本的JsonCpp存在多线程安全性和性能瓶颈。经过评估,我们最终选择了RapidJSON作为替代方案,并实现了显著的性能提升。 ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • LeetCode: 实现队列与栈的高级应用
    本文介绍如何使用队列和栈实现特定功能,包括动态维护队列元素并计算其平均值,以及栈操作中的优化技巧。 ... [详细]
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
author-avatar
克阳光沫沫的幸福
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有