热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

并发和多线程(二十)–LinkedBlockingQueue源码解析

目录类定义QueueBlockingQueue成员变量构造函数offer()put()take()enqueueanddequeuepeek()阻塞队列在日常开发中直接使用比较少,

目录



  • 类定义

    • Queue

    • BlockingQueue



  • 成员变量

  • 构造函数

  • offer()

  • put()

  • take()

  • enqueue and dequeue

  • peek()


阻塞队列在日常开发中直接使用比较少,但是在很多工具类或者框架中有很多应用,例如线程池,消息队列等。所以,深入了解阻塞队列也是很有必要的。所以这里来了解一下LinkedBlockingQueue的相关源码,从命名可以看出来是由链表实现的数据结构。


类定义

public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
}

从上面可以看到继承Queue,实现BlockingQueue,我们介绍一下两个类里面方法的作用,为了方便记忆和对比放到表格里进行展示。


Queue






























作用方法1方法2区别
新增add()offer()add()队列满的时候抛出异常,offer()队列满的时候返回false
查看并删除remove()poll()remove()队列为空的时候抛出异常,poll()队列为空的时候返回null
查看不删除element()peek()element()队列为空的时候抛出异常,peek()队列为空的时候返回null

BlockingQueue

BlockingQueue顾名思义带有阻塞的队列,方法有所区别,下面方法包含了Queue,因为属于继承关系,下面表格方法名用序号代替。







































作用方法1方法2方法3方法4区别
新增add()offer()put()offer(E e, long timeout, TimeUnit unit)队列满的时候,1和2作用和queue相同,3会一直阻塞,4阻塞一段时间,返回false
查看并删除remove()poll()take()poll(long timeout, TimeUnit unit)队列为空,1和2没有变化,3会一直阻塞,4会阻塞一段时间,返回null
查看不删除element()peek()队列为空,1和2没有变化

成员变量

//链表的容量,默认Integer.MAX_VALUE
private final int capacity;
//当前存在元素数量
private final AtomicInteger count = new AtomicInteger();
//链表的head节点
transient Node head;
//链表的tail节点
private transient Node last;
//主要用于take, poll等方法的加锁
private final ReentrantLock takeLock = new ReentrantLock();
//主要用在取值的阻塞场景
private final Condition notEmpty = takeLock.newCondition();
//主要用于put, offer等方法的加锁
private final ReentrantLock putLock = new ReentrantLock();
//主要用在新增的阻塞场景
private final Condition notFull = putLock.newCondition();

//Node比较简单,一个item,还有指向下个节点,也就是单向链表
static class Node {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node next;
Node(E x) { item = x; }
}

构造函数

//默认队列存储的元素最大为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

//自定义capacity,初始化head、tail
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}

public LinkedBlockingQueue(Collection c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//加锁,因为是添加,肯定是putLock
putLock.lock();
try {
int n = 0;
//遍历集合,每次生成一个node,添加到链表尾部
for (E e : c) {
if (e == null)
throw new NullPointerException();
//每次判断新增的节点是否超过capacity,如果是,抛出异常
if (n == capacity)
throw new IllegalStateException("Queue full");
//将节点添加到队列tail
enqueue(new Node(e));
++n;
}
//设置当前元素个数count
count.set(n);
//finally解锁
} finally {
putLock.unlock();
}
}

offer()

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//初始设置为-1,c <0,表示新增失败
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() enqueue(node);
c = count.getAndIncrement();
if (c + 1 notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

流程:



  1. 如果e为空,抛出异常。

  2. 如果当前队列已满,返回false。

  3. 将e封装成一个新的节点,加锁,putLock。

  4. 再次判断队列元素数量
  5. CAS将count+1,注意这里调用的是getAndIncrement返回的是+1之前的值。如果队列没满,唤醒某个某个因为添加而阻塞的线程。

  6. finally解锁,如果c == 0,加锁takeLock,唤醒继续添加。

  7. 返回 c >= 0。


put()

相对于offer(),put的代码会判断当前队列是否满了,如果满了,通过Condition阻塞,其他没啥区别。

在这里插入图片描述


take()

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//注意这里c是先获取,后-1的
if (c == capacity)
signalNotFull();
return x;
}

流程:



  1. 加锁,takeLock。

  2. 如果当前队列为空,直接通过notEmpty阻塞,等待被唤醒。

  3. 取出第一个元素,并删除元素。

  4. 如果c > 1,表示队列还有元素,唤醒别的线程获取。

  5. finally解锁,如果c == capacity,表示队列没满,加锁takeLock,唤醒继续添加。

  6. 返回 x。


enqueue and dequeue

private void enqueue(Node node) {
last = last.next = node;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

这里统一讲一下在链表中添加和删除数据的流程,特别是dequeue(),我刚看第一眼的时候有点蒙蔽的,下面举个栗子。

BlockingQueue queue = new LinkedBlockingDeque<>();
queue.offer(1);
queue.offer(2);
queue.offer(3);
Integer take = queue.take();
System.out.println(take);

在这里插入图片描述

这里把每一步都画出来了,还是比较好理解的。其余方法的逻辑都比较相似,下面简单说一下。


peek()

peek()和take()的代码差不多,只是不会删除元素,take()通过dequeue(),而peek()通过一句代码Node first = head.next;获得该节点的数据然后返回。



推荐阅读
  • 在iOS开发中,多线程技术的应用非常广泛,能够高效地执行多个调度任务。本文将重点介绍GCD(Grand Central Dispatch)在多线程开发中的应用,包括其函数和队列的实现细节。 ... [详细]
  • 关于进程的复习:#管道#数据的共享Managerdictlist#进程池#cpu个数1#retmap(func,iterable)#异步自带close和join#所有 ... [详细]
  • 本文将深入探讨 iOS 中的 Grand Central Dispatch (GCD),并介绍如何利用 GCD 进行高效多线程编程。如果你对线程的基本概念还不熟悉,建议先阅读相关基础资料。 ... [详细]
  • 深入探讨:Actor模型如何解决并发与分布式计算难题
    在现代软件开发中,高并发和分布式系统的设计面临着诸多挑战。本文基于Akka最新文档,详细探讨了Actor模型如何有效地解决这些挑战,并提供了对并发和分布式计算的新视角。 ... [详细]
  • JUC并发编程——线程的基本方法使用
    目录一、线程名称设置和获取二、线程的sleep()三、线程的interrupt四、join()五、yield()六、wait(),notify(),notifyAll( ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • 大华股份2013届校园招聘软件算法类试题D卷
    一、填空题(共17题,每题3分,总共51分)1.设有inta5,*b,**c,执行语句c&b,b&a后,**c的值为________答:5 ... [详细]
  • 驱动程序的基本结构1、Windows驱动程序中重要的数据结构1.1、驱动对象(DRIVER_OBJECT)每个驱动程序会有唯一的驱动对象与之对应,并且这个驱动对象是在驱 ... [详细]
  • 我自己做了一个网站图片的抓取,感觉速度有点慢抓取4000张图片可能得用15分钟左右的时间,我百度看用线程可以加快抓取,然后创建了5个线程抓取,但是5个线程是同步执行同样的操作一个图片就 ... [详细]
  • 在运行于MS SQL Server 2005的.NET 2.0 Web应用中,我偶尔会遇到令人头疼的SQL死锁问题。过去,我们主要通过调整查询来解决这些问题,但这既耗时又不可靠。我希望能找到一种确定性的查询模式,确保从设计上彻底避免SQL死锁。 ... [详细]
  • 面试题总结_2019年全网最热门的123个Java并发面试题总结
    面试题总结_2019年全网最热门的123个Java并发面试题总结 ... [详细]
  • Java中的引用类型详解
    本文详细介绍了Java中的引用类型,包括强引用、软引用、弱引用和虚引用的特点和应用场景。 ... [详细]
  • 本文详细介绍了Sleep函数的基本概念、使用方法及其背后的实现原理。适合对Sleep函数的使用和实现感兴趣的开发者阅读。通过本文,您将了解如何在不同操作系统中使用Sleep函数,以及其在多线程编程中的重要性。 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • Android异步处理一:使用Thread+Handler实现非UI线程更新UI界面Android异步处理二:使用AsyncTask异步更新UI界面Android异步处理三:Handler+Loope ... [详细]
author-avatar
克阳光沫沫的幸福
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有