作者:上海悠u7_ | 来源:互联网 | 2023-10-15 14:44
文章目录 ⭐️导读 🚩基本构造 📖核心源码解读 `put`方法 `poll/take`方法 `drainTo`方法 `remove`方法 ⚠️注意事项 🚗应用场景
⭐️导读 LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。
🚩基本构造
阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue
是提供Queue
的基本实现,我们重点关注Queue
、BlockingQueue
提供的api。
Queue
类提供最基本的API
boolean add ( E e) ; boolean offer ( E e) ; E remove ( ) ; E poll ( ) ; E element ( ) ; E peek ( ) ;
BlockingQueue
提供阻塞操作相关API
void put ( E e) ; boolean offer ( E e, long timeout, TimeUnit unit) &#xff1b;E take ( ) ; E poll ( long timeout, TimeUnit unit) ; int remainingCapacity ( ) ; int drainTo ( Collection < ? super E > c) ; int drainTo ( Collection < ? super E > c, int maxElements) ;
&#x1f4d6;核心源码解读 LinkedBlockingQueue
分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition
来控制他们的执行过程
private final ReentrantLock takeLock &#61; new ReentrantLock ( ) ; private final Condition notEmpty &#61; takeLock. newCondition ( ) ; private final ReentrantLock putLock &#61; new ReentrantLock ( ) ; private final Condition notFull &#61; putLock. newCondition ( ) ;
put
方法 将元素插入队列&#xff0c;如果队列没有可用空间则等待
public void put ( E e) throws InterruptedException { if ( e &#61;&#61; null ) throw new NullPointerException ( ) ; int c &#61; - 1 ; Node < E > node &#61; new Node < E > ( e) ; final ReentrantLock putLock &#61; this . putLock; final AtomicInteger count &#61; this . count; putLock. lockInterruptibly ( ) ; try { while ( count. get ( ) &#61;&#61; capacity) { notFull. await ( ) ; } enqueue ( node) ; c &#61; count. getAndIncrement ( ) ; if ( c &#43; 1 < capacity) notFull. signal ( ) ; } finally { putLock. unlock ( ) ; } if ( c &#61;&#61; 0 ) signalNotEmpty ( ) ; }
此处signalNotEmpty();
就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费
poll/take
方法 poll 查看头部元素&#xff0c;队列为空异常
take 移除并返回头部元素&#xff0c;如果没有可用则等待
public E poll ( long timeout, TimeUnit unit) throws InterruptedException { E x &#61; null ; int c &#61; - 1 ; long nanos &#61; unit. toNanos ( timeout) ; final AtomicInteger count &#61; this . count; final ReentrantLock takeLock &#61; this . takeLock; takeLock. lockInterruptibly ( ) ; try { while ( count. get ( ) &#61;&#61; 0 ) { if ( nanos <&#61; 0 ) return null ; nanos &#61; notEmpty. awaitNanos ( nanos) ; } x &#61; dequeue ( ) ; c &#61; count. getAndDecrement ( ) ; if ( c > 1 ) notEmpty. signal ( ) ; } finally { takeLock. unlock ( ) ; } if ( c &#61;&#61; capacity) signalNotFull ( ) ; return x; }
此处signalNotFull();
是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入
take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下
public E take ( ) throws InterruptedException { E x; int c &#61; - 1 ; final AtomicInteger count &#61; this . count; final ReentrantLock takeLock &#61; this . takeLock; takeLock. lockInterruptibly ( ) ; try { while ( count. get ( ) &#61;&#61; 0 ) { notEmpty. await ( ) ; } x &#61; dequeue ( ) ; c &#61; count. getAndDecrement ( ) ; if ( c > 1 ) notEmpty. signal ( ) ; } finally { takeLock. unlock ( ) ; } if ( c &#61;&#61; capacity) signalNotFull ( ) ; return x; }
drainTo
方法 从队列中取出全部的元素并插入到指定集合中
public int drainTo ( Collection < ? super E > c, int maxElements) { if ( c &#61;&#61; null ) throw new NullPointerException ( ) ; if ( c &#61;&#61; this ) throw new IllegalArgumentException ( ) ; if ( maxElements <&#61; 0 ) return 0 ; boolean signalNotFull &#61; false ; final ReentrantLock takeLock &#61; this . takeLock; takeLock. lock ( ) ; try { int n &#61; Math . min ( maxElements, count. get ( ) ) ; Node < E > h &#61; head; int i &#61; 0 ; try { while ( i < n) { Node < E > p &#61; h. next; c. add ( p. item) ; p. item &#61; null ; h. next &#61; h; h &#61; p; &#43;&#43; i; } return n; } finally { if ( i > 0 ) { head &#61; h; signalNotFull &#61; ( count. getAndAdd ( - i) &#61;&#61; capacity) ; } } } finally { takeLock. unlock ( ) ; if ( signalNotFull) signalNotFull ( ) ; } }
remove
方法 public boolean remove ( Object o) { if ( o &#61;&#61; null ) return false ; fullyLock ( ) ; try { for ( Node < E > trail &#61; head, p &#61; trail. next; p !&#61; null ; trail &#61; p, p &#61; p. next) { if ( o. equals ( p. item) ) { unlink ( p, trail) ; return true ; } } return false ; } finally { fullyUnlock ( ) ; } }
注意一下此处的加锁逻辑
void fullyLock ( ) { putLock. lock ( ) ; takeLock. lock ( ) ; }
可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁
。
一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。
⚠️注意事项 使用LinkedBlockingQueue
时要额外注意影响性能的方法
如&#xff1a;remove
/contains
/toArray
/toString
/clear
以上方法的时间复杂度均为O(n)&#43;悲观锁
&#xff0c;如非必要最好不要使用
&#x1f697;应用场景 LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。
调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用drainTo
归集然后统一请求。 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。 下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控
ResourcePool
对象池抽象类&#xff0c;实现该类就能初始化一个对象池
public abstract class ResourcePool < T extends ResourceModel > { private final LinkedBlockingQueue < T > queue; public ResourcePool ( int poolMax) { queue &#61; new LinkedBlockingQueue < > ( poolMax) ; for ( int i &#61; 0 ; i < poolMax; i&#43;&#43; ) { T model &#61; createResource ( ) ; model. pool &#61; this ; model. invalid &#61; true ; queue. add ( model) ; } } public T getResource ( ) { try { do { T t &#61; queue. take ( ) ; if ( t. invalid) { t. invalid &#61; false ; return open ( t) ; } } while ( true ) ; } catch ( InterruptedException e) { throw new RuntimeException ( e) ; } } protected T open ( T t) { return t; } protected abstract T createResource ( ) ; public void free ( T t) { if ( ! t. invalid) { t. invalid &#61; true ; queue. offer ( close ( t) ) ; } } protected T close ( T t) { return t; } }
ResourceModel
抽象对象
public abstract class ResourceModel implements Closeable { ResourcePool pool; boolean invalid; &#64;Override public void close ( ) throws IOException { pool. free ( this ) ; } }
TestModel
对象实例
&#64;Setter public class TestModel extends ResourceModel { public TestModel ( String name, int age) { this . name &#61; name; this . age &#61; age; } private String name; private int age; }
TestPool
对象池实例
public class TestPool extends ResourcePool < TestModel > { public TestPool ( int poolMax) { super ( poolMax) ; } &#64;Override protected TestModel createResource ( ) { return new TestModel ( "" , 0 ) ; } &#64;Override protected TestModel open ( TestModel testModel) { return super . open ( testModel) ; } &#64;Override protected TestModel close ( TestModel testModel) { testModel. setAge ( 0 ) ; testModel. setName ( "" ) ; return super . close ( testModel) ; } }
使用方式1
public static void main ( String [ ] args) throws IOException { TestPool testPool &#61; new TestPool ( 30 ) ; TestModel model &#61; testPool. getResource ( ) ; model. close ( ) ; }
使用方式2
public static void main ( String [ ] args) throws IOException { TestPool testPool &#61; new TestPool ( 30 ) ; try ( TestModel model &#61; testPool. getResource ( ) ) { } }