作者:手机用户2502931035 | 来源:互联网 | 2023-09-25 03:40
目录BlockingQueue介绍队列类型常见的5种阻塞队列BlockingQueue中APIBlockingQueue源码解析ArrayBlockingQueue简单
目录
BlockingQueue介绍
队列类型
常见的5种阻塞队列
BlockingQueue中API
BlockingQueue源码解析
ArrayBlockingQueue简单图解
BlockingQueue介绍
BlockingQueue 是一个先进先出(FIFO)的阻塞队列。
BlockingQueue,java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制。
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于数据的入队,出队的原理,却是大同小异。
队列类型
1. 无限队列 (unbounded queue ) - 几乎可以无限增长
2. 有限队列 ( bounded queue ) - 定义了最大容量
通常用链表或者数组实现
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级
队列主要操作:入队(enqueue)与出队(dequeue)
常见的5种阻塞队列
ArrayBlockingQueue 由数组支持的有界队列
LinkedBlockingQueue 由链接节点支持的可选有界队列
PriorityBlockingQueue 由优先级堆支持的无界优先级队列
DelayQueue 由优先级堆支持的、基于时间的调度队列
SynchronousQueue 没有容量,不存储元素的阻塞队列,也即单个元素的队列
BlockingQueue中API
offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
remainingCapacity():获取队列中剩余的空间。
remove(Object o): 从队列中移除指定的值。
contains(Object o): 判断队列中是否拥有该值。
drainTo(Collection c): 将队列中值,全部移除,并发设置到给定的集合中。
BlockingQueue源码解析
下面的源码以ArrayBlockingQueue为例。ArrayBlockingQueue是一个典型的生产者-消费者的模式。带着生产者-消费者的思想去阅读源码
参数解析:
//保存数据的数组,可以看出队列的底层一个数组的数据结构
final Object[] items;//下一次获取元素的数组下标
int takeIndex;//下一次添加元素的数组下标
int putIndex;//队列中元素的数量
int count;//可重入锁
final ReentrantLock lock;//队列不为空的信号量
private final Condition notEmpty;//队列不满的信号量
private final Condition notFull;//当前迭代器的共享状态,如果已知不存在任何迭代器,则为null。允许队列操作更新迭代器状态。
transient Itrs itrs = null;
//构造器
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <&#61; 0)throw new IllegalArgumentException();//初始化数组this.items &#61; new Object[capacity];//创建非公平锁lock &#61; new ReentrantLock(fair);//初始化不为空的条件锁notEmpty &#61; lock.newCondition();//初始化不满的条件锁notFull &#61; lock.newCondition();
}
//实例化一个ArrayBlockingQueue
static BlockingQueue queue &#61; new ArrayBlockingQueue(16);
//其他业务代码
//往队列中添加数据
queue.put(data); 将添加数据的操作看成一个生产者&#xff0c;往队列中添加数据的常用的2种方式&#xff0c;offer和put
//往队列插入一条数据&#xff0c;成功返回true,否则返回false&#xff0c;其实add()最终调的也是offer()
public boolean offer(E e) {//校验参数是否为nullcheckNotNull(e);//开始加锁final ReentrantLock lock &#61; this.lock;lock.lock();try {//如果队列里的元素已经放满了&#xff0c;返回falseif (count &#61;&#61; items.length)return false;else {//队列还未满&#xff0c;元素入队enqueue(e);return true;}} finally {lock.unlock();}
}
//往队列插入一条数据&#xff0c;如果队列满了&#xff0c;就进行阻塞&#xff0c;等到队列有空间的时候再入队
public void put(E e) throws InterruptedException {//校验参数是否为nullcheckNotNull(e);final ReentrantLock lock &#61; this.lock;lock.lockInterruptibly();try {while (count &#61;&#61; items.length)//如果队列里的元素已经放满了&#xff0c;则进行阻塞&#xff0c;最终会调用LockSupport.park(this); 进行阻塞&#xff0c;然后等待元素出队时进行唤醒notFull.await();//队列还未满&#xff0c;元素入队enqueue(e);} finally {lock.unlock();}
}
入队操作&#xff1a;
//元素入队
private void enqueue(E x) {// assert lock.getHoldCount() &#61;&#61; 1;// assert items[putIndex] &#61;&#61; null;final Object[] items &#61; this.items;//将参数赋值&#xff0c;这个不知道怎么解释&#xff0c;根据图来理解吧items[putIndex] &#61; x;if (&#43;&#43;putIndex &#61;&#61; items.length)//如果队列已经满了&#xff0c;那么下一个参数就是从头&#xff08;数组下标为0的&#xff09;开始入队putIndex &#61; 0;//统计队列中元素的数量count&#43;&#43;;//一旦有数据入队&#xff0c;说明队列不为空&#xff0c;即通知消费者去从队列中拿数据&#xff0c;标识队列不为空的信号量notEmpty.signal();
}
public final void signal() {//判断当前线程是不是获取锁线程if (!isHeldExclusively())throw new IllegalMonitorStateException();//从条件队列中获取第一个节点Node first &#61; firstWaiter;if (first !&#61; null)//第一个节点不为空时&#xff0c;去唤醒第二个节点中的线程&#xff08;此时这个线程应该是消费者&#xff09;&#xff0c;最终会调用LockSupport.unpark(node.thread);//为什么是取去唤醒第二个节点中的线程&#xff1f;因为真实存放线程的是从第二个节点开始的&#xff0c;详情可见ReentrantLock的源码解析doSignal(first);
}
此时数据已经入队成功&#xff0c;将获取数据看成一个消费者从队列中获取数据&#xff0c;获取数据常用的2种方式&#xff0c;poll和take
//直接从队列头中去取&#xff0c;有就取出来&#xff0c;并且删除队列中的元素&#xff0c;队列中没有元素就返回null
public E poll() {final ReentrantLock lock &#61; this.lock;lock.lock();try {//有元素的话&#xff0c;直接从队列头中去取return (count &#61;&#61; 0) ? null : dequeue();} finally {lock.unlock();}
}
//从队列头中去取元素&#xff0c;有就取出来&#xff0c;并且删除队列中的元素&#xff0c;队列中没有元素则阻塞等待&#xff0c;一直等到有元素就取出来
public E take() throws InterruptedException {final ReentrantLock lock &#61; this.lock;lock.lockInterruptibly();try {while (count &#61;&#61; 0)//队列中没有元素则阻塞等待,最终会调用LockSupport.park(this); 进行阻塞&#xff0c;然后等待元素入队时进行唤醒notEmpty.await();//有元素的话&#xff0c;直接从队列头中去取return dequeue();} finally {lock.unlock();}
}
元素出队操作&#xff1a;
//取队列元素
private E dequeue() {// assert lock.getHoldCount() &#61;&#61; 1;// assert items[takeIndex] !&#61; null;final Object[] items &#61; this.items;&#64;SuppressWarnings("unchecked")//获取到获取元素的指针的那个数据E x &#61; (E) items[takeIndex];//并且将该数据从队列中删除items[takeIndex] &#61; null;if (&#43;&#43;takeIndex &#61;&#61; items.length)//如果该元素已经是队列的最后一个元素了&#xff0c;那么下一个参数就是从头&#xff08;数组下标为0的&#xff09;开始获取takeIndex &#61; 0;//队列元素减一count--;if (itrs !&#61; null)itrs.elementDequeued();//一旦有数据出队&#xff0c;说明队列不满&#xff0c;即通知生产者去往队列中补充数据&#xff0c;(标识队列不满的信号量)notFull.signal();return x;
}
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();//从条件队列中获取第一个节点Node first &#61; firstWaiter;if (first !&#61; null)//第一个节点不为空时&#xff0c;去唤醒第二个节点中的线程&#xff08;此时这个线程应该是生产者&#xff09;&#xff0c;最终会调用LockSupport.unpark(node.thread);//为什么是取去唤醒第二个节点中的线程&#xff1f;因为真实存放线程的是从第二个节点开始的&#xff0c;详情可见ReentrantLock的源码解析doSignal(first);
}
至此&#xff0c;一个元素从生产者入队列&#xff0c;再到消费者出队列的一个完整的流程就走完了。源码解析中的 doSignal(first); 这块代码的解析我没有给贴出来&#xff0c;因为这里面的逻辑比较绕&#xff0c;是将一个Node节点在CLH队列和条件队列相互转移&#xff0c;比较复杂&#xff0c;有些地方我一时还表达不好&#xff0c;所以就不详细说明了&#xff0c;这里面的逻辑大家可以自己跟进去看一下。
ArrayBlockingQueue简单图解