作者:上海悠u7_ | 来源:互联网 | 2023-10-15 14:44
文章目录
- ⭐️导读
- 🚩基本构造
- 📖核心源码解读
- `put`方法
- `poll/take`方法
- `drainTo`方法
- `remove`方法
- ⚠️注意事项
- 🚗应用场景
⭐️导读
LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。
🚩基本构造
阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue
是提供Queue
的基本实现,我们重点关注Queue
、BlockingQueue
提供的api。
Queue
类提供最基本的API
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
BlockingQueue
提供阻塞操作相关API
void put(E e);
boolean offer(E e, long timeout, TimeUnit unit);
E take();
E poll(long timeout, TimeUnit unit);
int remainingCapacity();
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
&#x1f4d6;核心源码解读
LinkedBlockingQueue
分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition
来控制他们的执行过程
private final ReentrantLock takeLock &#61; new ReentrantLock();
private final Condition notEmpty &#61; takeLock.newCondition();
private final ReentrantLock putLock &#61; new ReentrantLock();
private final Condition notFull &#61; putLock.newCondition();
put
方法
将元素插入队列&#xff0c;如果队列没有可用空间则等待
public void put(E e) throws InterruptedException {if (e &#61;&#61; null) throw new NullPointerException();int c &#61; -1;Node<E> node &#61; new Node<E>(e);final ReentrantLock putLock &#61; this.putLock;final AtomicInteger count &#61; this.count;putLock.lockInterruptibly();try {while (count.get() &#61;&#61; capacity) {notFull.await();}enqueue(node);c &#61; count.getAndIncrement();if (c &#43; 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c &#61;&#61; 0)signalNotEmpty();
}
此处signalNotEmpty();
就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费
poll/take
方法
poll 查看头部元素&#xff0c;队列为空异常
take 移除并返回头部元素&#xff0c;如果没有可用则等待
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x &#61; null;int c &#61; -1;long nanos &#61; unit.toNanos(timeout);final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {while (count.get() &#61;&#61; 0) {if (nanos <&#61; 0)return null;nanos &#61; notEmpty.awaitNanos(nanos);}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}
此处signalNotFull();
是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入
take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下
public E take() throws InterruptedException {E x;int c &#61; -1;final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {while (count.get() &#61;&#61; 0) {notEmpty.await();}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}
drainTo
方法
从队列中取出全部的元素并插入到指定集合中
public int drainTo(Collection<? super E> c, int maxElements) {if (c &#61;&#61; null)throw new NullPointerException();if (c &#61;&#61; this)throw new IllegalArgumentException();if (maxElements <&#61; 0)return 0;boolean signalNotFull &#61; false;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lock();try {int n &#61; Math.min(maxElements, count.get());Node<E> h &#61; head;int i &#61; 0;try {while (i < n) {Node<E> p &#61; h.next;c.add(p.item);p.item &#61; null;h.next &#61; h;h &#61; p;&#43;&#43;i;}return n;} finally {if (i > 0) {head &#61; h;signalNotFull &#61; (count.getAndAdd(-i) &#61;&#61; capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}
}
remove
方法
public boolean remove(Object o) {if (o &#61;&#61; null) return false;fullyLock();try {for (Node<E> trail &#61; head, p &#61; trail.next;p !&#61; null;trail &#61; p, p &#61; p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}
注意一下此处的加锁逻辑
void fullyLock() {putLock.lock();takeLock.lock();
}
可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁
。
一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。
⚠️注意事项
使用LinkedBlockingQueue
时要额外注意影响性能的方法
如&#xff1a;remove
/contains
/toArray
/toString
/clear
以上方法的时间复杂度均为O(n)&#43;悲观锁
&#xff0c;如非必要最好不要使用
&#x1f697;应用场景
LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。
- 调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用
drainTo
归集然后统一请求。 - 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
- 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。
下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控
ResourcePool
对象池抽象类&#xff0c;实现该类就能初始化一个对象池
public abstract class ResourcePool<T extends ResourceModel> {private final LinkedBlockingQueue<T> queue;public ResourcePool(int poolMax) {queue &#61; new LinkedBlockingQueue<>(poolMax);for (int i &#61; 0; i < poolMax; i&#43;&#43;) {T model &#61; createResource();model.pool &#61; this;model.invalid &#61; true;queue.add(model);}}public T getResource() {try {do {T t &#61; queue.take();if (t.invalid) {t.invalid &#61; false;return open(t);}} while (true);} catch (InterruptedException e) {throw new RuntimeException(e);}}protected T open(T t) {return t;}protected abstract T createResource();public void free(T t) {if (!t.invalid) {t.invalid &#61; true;queue.offer(close(t));}}protected T close(T t) {return t;}}
ResourceModel
抽象对象
public abstract class ResourceModel implements Closeable {ResourcePool pool;boolean invalid;&#64;Overridepublic void close() throws IOException {pool.free(this);}
}
TestModel
对象实例
&#64;Setter
public class TestModel extends ResourceModel {public TestModel(String name, int age) {this.name &#61; name;this.age &#61; age;}private String name;private int age;}
TestPool
对象池实例
public class TestPool extends ResourcePool<TestModel> {public TestPool(int poolMax) {super(poolMax);}&#64;Overrideprotected TestModel createResource() {return new TestModel("", 0);}&#64;Overrideprotected TestModel open(TestModel testModel) {return super.open(testModel);}&#64;Overrideprotected TestModel close(TestModel testModel) {testModel.setAge(0);testModel.setName("");return super.close(testModel);}
}
使用方式1
public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);TestModel model &#61; testPool.getResource();model.close();
}
使用方式2
public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);try(TestModel model &#61; testPool.getResource()) {}
}