热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

BlockQueue,PriorityQueue,DelayQueue解析及其golang实现

前言在日常开发中,队列是常用到的数据结构,BlockQueue、PriorityQueue、DelayQueue更是其中的重点。BlockQueue即阻塞队列,当队列满时,继续

前言

  在日常开发中,队列是常用到的数据结构,BlockQueue、PriorityQueue、DelayQueue更是其中的重点。BlockQueue即阻塞队列,当队列满时,继续发送会阻塞;当队列为空时,继续接收会阻塞,是典型的生产-消费类型。PriorityQueue基于MaxHeap/MinHeap,保证队头的元素始终是最大值/最小值,在求max(n)(在m个数中找到最大/小的n个元素,其中m>>n)问题中具有良好的性能。DelayQueue则是延时队列,在定时任务中有重要的场景。此篇博客的契机是笔者在看kafka相关资料时,其定时/延时任务都基于TimeWheel算法(定时/延时调度常用的算法)。在kafka的TimeWheel中,将DelayQueue和TimeWheel结合起来,解决了DelayQueue插入/删除复杂度高和TimeWheel大量无效推进的问题。于是想深入了解一下此实现方式。刚好对DelayQueue不是很熟,因此重新温习了一下相关的重要数据结构。结合笔者目前的主语言golang,实现了golang实验版本的对应数据结构。


1. BlockQueue

1.1 BlockQueue实现原理

  BlockQueue即带有阻塞性质的队列。当队列为空时,读取会阻塞;当队列满时,写入会阻塞。底层存储可以基于数组或链表实现。对此java有对应的ArrayBlockingQueue和LinkedBlockingQueue实现。该队列的关键是阻塞的实现方式,以ArrayBlockingQueue为例,在java中,通过条件Condition实现,核心的实现如下:

class ArrayBlockingQueue {
final Object[] items; // 数组,保存队列的元素
// 读取数据数组下标
int takeIndex;
// 插入数据数组下标
int putIndex;
// 队列中数组元素个数
int count;
// 并发访问控制锁
final ReentrantLock lock;
// 条件队列
private final Condition notEmpty; // 用于实现阻塞和同志
private final Condition notFull;

// 阻塞入队put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
// 当队列满时,通过await挂起线程
enqueue(e); // 入队,内部会通过notEmpty.signal()通知take()阻塞的线程
} finally {
lock.unlock();
}
}

// 阻塞出队take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 当前队列空,让出资源等待
while (count == 0)
notEmpty.await();
// 当队列为空时,通过await挂起线程
return dequeue(); // 出队时,通过notFull.signal()通知put()阻塞的线程
} finally {
lock.unlock();
}
}

}

View Code

 

可见,实现阻塞队列的核心要点如下:



  • 队列是共享资源,为了线程安全,访问时需要加锁;

  • 当满足阻塞条件时,挂起线程;通过两个条件notFull和notEmpty唤醒挂起的线程


    • 有数据入队时(线程调用put),notEmpty.signal()->唤醒take阻塞的线程

    • 有数据出队时(线程调用take),notFull.signal()->唤醒put阻塞的线程



1.2 golang实现ArrayBlockingQueue

  golang中与线程对应的是协程(goroutine)。但是协程的启停通常不由用户代码控制(阻塞调用gopark ,启用可以通过schedule()让golang重新开启一次调度,以上函数均不能被外部包调用),协程阻塞通常分为以下几种场景:



  • 原子操作,互斥量,锁,读写channel导致goroutine阻塞;

  • 网络请求和IO操作导致goroutine阻塞;

  • 系统调用导致goroutine阻塞;

  • 调用sleep操作,goroutine阻塞。

当goroutine阻塞时,golang的调度器就会将此goroutine切出当前运行的线程,换一个可执行的goroutine继续在当前线程中运行。由于阻塞和启用操作比较底层,所以想利用java 的方式实现BlockQueue,操作会比较复杂。

 

   golang的channel,本身就具有队列的属性,同时其具有读写阻塞,并发安全的特性,是天然的BlockQueue。唯一的不足就是java的ArrayBlockingQueue中包含两个方法:element和peek,其作用是返回队头的元素,但是不取出来。想要实现这两个函数,用golang的channel就有点捉襟见肘,必须先遍历读取channel所有元素,然后再写回去,实现复杂度为O(n)(n为队列大小)。因此如果想要实现这两个函数的功能,就必须抛弃channel,按照java的方式去实现,底层仍然采用数组存储,那么如何阻塞并唤醒goroutine呢?答案还是channel(可以发现,想要实现goroutine之间的交互,channel是一个绕不过去的坎)。之前谈到goroutine阻塞时,有一种情况:读写channel阻塞。当读写cahnnel造成goroutine阻塞时,调度器会将阻塞的goroutine换出线程;而读写channel阻塞条件解除时,阻塞的goroutine又会重新唤醒。因此,只需要将java中ArrayBlockingQueue.Condition换成channel,就能实现goroutine之间的同步。

 

  还有一个比较大的问题是,对于线程级别切换的语言,比如C++,java,在线程阻塞处于等待并切出CPU时,锁会自动释放;当线程重新获取到CPU使用权进入运行态时,会重新获取锁。但是goroutine的切换,锁不会自动释放。如果在goroutine阻塞之前不手动释放锁,那么阻塞的goroutine就会一直持有锁;导致其他goroutine没法向队列里添加或获取元素,持有锁等待channel消息而阻塞的goroutine就会死锁。

 

  注意到了以上两个问题,实现代码就比较容易了,以下是相关代码的实现。

// Queue接口
type Queue interface {
Add(e
interface{}) error
Offer(e
interface{}) bool
Remove() (e
interface{}, err error)
Poll() (e
interface{}, getE bool)
Element() (e
interface{}, err error)
Peek() (e
interface{}, getE bool)
}
// BlockingQueue接口
type BlockingQueue interface {
Queue
Put(e
interface{}) (closed bool) // closed状态可以标记队列是否关闭,用于当队列不可用时,能从阻塞中返回
Take() (e interface{}, closed bool) // closed状态可以标记队列是否关闭,用于当队列不可用时,能从阻塞中返回
OfferWithTimeout(e interface{}, timeout time.Duration) (offerd bool, closed bool)
PollWithTimeout(timeout time.Duration) (e
interface{}, err error, closed bool)
}
// ArrayBlockingQueue及其实现
type ArrayBlockingQueue struct {
objects []
interface{} // 存储元素
takIdx int
putIdx
int
count
int
mutex
*sync.Mutex // 保护临界区资源objects
notEmpty chan bool // 通知队列不为空的channel
notFull chan bool // 通知队列没满的channel
}
// ArrayBlockingQueue初始化
func NewArrayBlockingQueue(cap int32) (*ArrayBlockingQueue, error) {
if cap <= 0 {
return nil, fmt.Errorf("cap of a queue should be greater than 0, got: %d", cap)
}
q :
= new(ArrayBlockingQueue)
q.objects
= make([]interface{}, cap)
q.notEmpty
= make(chan bool, 1)
q.notFull
= make(chan bool, 1)
q.mutex
= new(sync.Mutex)
return q, nil
}
// 解除阻塞,由发送方调用,调用Put函数的一方
func (q *ArrayBlockingQueue) DestroyFromSender() {
close(q.notEmpty)
}
// 解除阻塞,由接收方调用,调用Take函数的一方
func (q *ArrayBlockingQueue) DestroyFromReceiver() {
close(q.notFull)
}
// 入队,仅供ArrayBlockingQueue其他函数调用,在调用此函数前,已经加锁保证了临界区资源访问的安全性
func (q *ArrayBlockingQueue) enqueue(e interface{}) {
q.objects[q.putIdx]
= e
q.putIdx
+= 1
if q.putIdx == len(q.objects) {
q.putIdx
= 0
}
q.count
++
select {
case q.notEmpty <- true: // 此处一定是非阻塞式发送非空的信号
default:
return
}
}
// 出队,仅供ArrayBlockingQueue其他函数调用
func (q *ArrayBlockingQueue) dequeue() (e interface{}) {
e
= q.objects[q.takIdx]
q.objects[q.takIdx]
= nil
q.takIdx
+= 1
if q.takIdx == len(q.objects) {
q.takIdx
= 0
}
q.count
-= 1
select {
case q.notFull <- true: // 此处一定是非阻塞式发送非空的信号
default:
return e
}
return e
}
// Queue接口的相关功能都比较容易实现,限于篇幅不在此展示
// 阻塞式接口
//阻塞式Put
func (q *ArrayBlockingQueue) Put(e interface{}) (closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == len(q.objects) {
q.mutex.Unlock()
// 阻塞之前一定要释放锁,否则就会死锁
_, ok := <-q.notFull
if !ok {
// queue closed
q.mutex.Lock() // 判断队列是否已经关闭,关闭,则需要重新上锁,以便defer q.mutex.Unlock()解锁时不会报错
return true // 队列已关闭
}
q.mutex.Lock()
// 阻塞解除时,重新上锁,再次判断队列是否已满
}
q.enqueue(e)
return false
}
//阻塞式Take
func (q *ArrayBlockingQueue) Take() (e interface{}, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == 0 {
q.mutex.Unlock()
// 阻塞之前先解锁
if _, ok := <-q.notEmpty; !ok {
// queue closed
q.mutex.Lock() // 判断队列是否已经关闭,关闭,则需要重新上锁,以便defer q.mutex.Unlock()解锁时不会报错
return e, true
}
q.mutex.Lock()
// 阻塞解除重新上锁
}
return q.dequeue(), false
}
// 对于带timeout的take和put,实现如下
func (q *ArrayBlockingQueue) OfferWithTimeout(e interface{}, timeout time.Duration) (offered bool, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == len(q.objects) {
q.mutex.Unlock()
select {
case _, ok := <-q.notFull:
if !ok {
// queue closed
q.mutex.Lock()
return false, true
}
case <-time.After(timeout): // 在阻塞的基础上,增加了超时机制
q.mutex.Lock()
return false, false
}
q.mutex.Lock()
}
q.enqueue(e)
return true, false
}
func (q
*ArrayBlockingQueue) PollWithTimeout(timeout time.Duration) (e interface{}, err error, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == 0 {
q.mutex.Unlock()
select {
case _, ok := <-q.notEmpty:
if !ok {
// queue closed
q.mutex.Lock()
return e, nil, true
}
case <-time.After(timeout): // 增加了超时机制
q.mutex.Lock()
return e, errors.New("queue is empty"), false
}
q.mutex.Lock()
}
return q.dequeue(), nil, false
}

View Code

总结一下ArrayBlockingQueue的实现要点:



  • 通过channel代替java的Condition,实现goroutine的阻塞和唤醒

  • 在阻塞之前先手动释放锁;阻塞解除重新加锁

  • 增加channel是否已关闭的判断,从而在队列关闭时,能退出阻塞

下面是一个简单的示例,演示如何使用ArrayBlockingQueue:

func BlockConsumer(q *ArrayBlockingQueue, wg *sync.WaitGroup) {
for i := 0; i <100; i++ {
val, closed :
= q.Take()
if closed {
break
}
time.Sleep(time.Second
* 1)
fmt.Printf(
"q take succeed, val: %v, serial num: %d\n", val, i)
}
wg.Done()
}
func BlockProducer(q
*ArrayBlockingQueue, wg *sync.WaitGroup) {
for i := 0; i <100; i++ {
if closed := q.Put("put_" + strconv.Itoa(i)); closed {
break
}
fmt.Println(
"q put succeed, serial num: ", i)
}
fmt.Println(
"BlockProducer finished")
q.DestroyFromSender()
wg.Done()
}
func main() {
q, _ :
= NewArrayBlockingQueue(2)
wg :
= new(sync.WaitGroup)
wg.Add(
4)
// 一个生产者
go BlockProducer(q, wg)
/// 三个消费者
go BlockConsumer(q, wg)
go BlockConsumer(q, wg)
go BlockConsumer(q, wg)
wg.Wait()
}

View Code


2. PriorityQueue

  PriorityQueue底层依赖于一颗完全二叉树构造的最大堆(max_heap)或最小堆(min_heap),最大堆和最小堆有以下属性:



  • 堆是一颗完全二叉树;

  • 根节点的值大于子节点(最大堆)或小于子节点(最小堆);

  • 左右子树同样是一颗最大堆数/最小堆树。

由于PriorityQueue底层是完全二叉树,因此对于一个序号为i的节点,左孩子节点的序号为2i,右孩子节点的序号为2i+1,因此一般用数组来存储(数组序号为0的位置不存储任何元素)。同时根节点的元素始终是最大值(max_heap)或最小值(min_heap),这样每次从PriorityQueue取数时,都是取根节点的元素,取出来的值始终有序,这也是PriorityQueue如此命名的原因。

 

  在我之前的文章里,有详细介绍max_heap和min_heap的性质和相关函数实现:max_heap与min_heap ,在此不再赘述。由于PriorityQueue的实现不涉及到编程语言的特性,在此也不再附上go代码实现,感兴趣的读者可以自行依照原理去实现。

 

  PriorityQueue在Top K问题上有广泛的运用,感兴趣的同学,可以练习一下Leetcode上PriorityQueue相关的题目,如下:



  • Leetcode 215:第K个最大元素

  • Leetcode 347:前K个高频元素

  • Leetcode 239:滑动窗口的最大值

  • 剑指offer 41:数据流中的中位数


3. DelayQueue

3.1 DelayQueue实现原理

  DelayQueue是一个延时队列,队头的元素始终是到期时间最早的。获取元素时,首先判断队头元素的delay是否小于等于0;当小于等于0时,才能取出;否则阻塞直到delay小于等于0。DelayQueue的特性如下:



  • 需要按延期时间有序出队,这个就是利用PriorityQueue实现的(按照延时时间delay存到优先队列里);

  • 当延时没到时,获取队列元素会造成阻塞,这部分的性质又跟BlockQueue相似,其底层也是延时未到时,阻塞线程;然后在延时完成时唤醒线程。

 因此,DelayQueue是PriorityQueue+BlockQueue的组合,广泛用于延时任务中。

 

   我们首先来看一下java的实现方式。由于DelayQueue是根据元素延时时间确定出队顺序的,因此要求每个元素实现了Delayed接口,包含getDelay(TimeUnit.NANOSECONDS)方法,用于获取剩余的延时时间。DelayQueue的核心数据结构如下:

class DelayQueue {
private final transient ReentrantLock lock;
private final PriorityQueue q; // 优先队列,用于底层存储
private Thread leader = null; // 最先消费的等待线程
private final Condition available; // 可用条件
...
}

 DelayQueue的内部数据跟BlockQueue整体差不多,也比较好理解。然后是DelayQueue的两个核心方法:

1 // 存入元素
2 public boolean offer(E e) {
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 q.offer(e); // 优先队列入队,非阻塞的
7 if (q.peek() == e) {
8 // 说明入队的元素delay时间最小,应当唤醒线程
9 leader = null; // important
10 available.signal(); // 唤醒其他等待的线程
11 }
12 return true
13 } finally {
14 lock.unlock();
15 }
16 }
17
18 // 阻塞获取元素
19 public E take() throws InterruptedException {
20 final ReentrantLock lock = this.lock;
21 lock.lockInterruptibly(); // 上锁
22 try {
23 // 阻塞式获取元素
24 for(;;) {
25 // 查看队头元素是否延时到期
26 E first = q.peek();
27 if (first == null)
28 available.await(); // 队头元素为空,说明没元素,等待
29 else {
30 long delay = first.getDelay(NANOSECONDS) // 获取延时
31 if (delay <= 0)
32 return q.poll() // 元素到期,直接返回
33 else {
34 // 需要等待
35 first = null; // 释放队头元素引用,方便其他线程获取元素并释放内存
36 // 以下部分是DelayQueue的精髓部分
37 if (leader != null)
38 // 说明已经有定时等待线程,只需要阻塞即可
39 available.await()
40 else {
41 // 说明没有定时等待线程,那么当前线程设置为定时等待线程,等待的时间为delay,保证延时结束能第一时间消费
42 Thread thisThread = Thread.currentThread();
43 leader = thisThread;
44 try {
45 available.awaitNanos(delay); // 定时等待
46 } finally {
47 if (leader == thisThread)
48 leader = null // 重置定时等待线程
49 }
50 }
51 }
52 }
53 }
54 } finally {
55 // 如果 leader 为空,且队列不为空,唤醒一个消费线程
56 if (leader == null && q.peek() != null)
57 available.signal();
58 lock.unlock();
59 }
60 }

 

整个流程比较复杂,在此引用【Java多线程】DelayQueue源码分析 (二十六) - H__D - 博客园总结的一张图,对整个过程进行总结:

 

 

DelayQueue最难理解的部分是其leader成员变量的设置。假设以下场景:

consumer1调用take()时,队列没有元素。这个时候consumer1线程直接阻塞,leader=null --> producer1调用offer(),放入元素element1,delay=d1, 通过available.signal()唤醒consumer1 --> consumer1被唤醒,发现队列队头有元素element1且其dealy=d1>0,此时leader=consumer1, consumer1定时等待d1时长 --> 此时,又增加了若干消费者consumer_i(i=2,3,4..),依照源码,leader != null, 其他若干consumer都会无限时长阻塞 --> 然后又进来了一个元素element2,该元素的delay=d2比当前队头元素小(d1>d2>0),因此会调用available.signal(),那么可能唤醒:

(1)leader线程,leader线程重置leader=null,然后查看队头元素的delay,重置该delay;

(2)非leader线程,此时 队头元素delay>0,同时由于入队时在available.signal()之前,重置了leader=null;所以此时被唤醒的线程会被设置成定时线程,定时时长为d1。保证了队头元素能按时唤醒。

 综上,可以看出leader变量的重要性,保证队头元素的一定是定时等待,能按时唤醒消费线程。


3.2 golang实现DelayQueue


3.2.1 实现PriorityQueue

DelayQueue依赖PriorityQueue,因此需要先实现一个PriorityQueue。基于golang的heap实现PriorityQueue,其中的元素需要有getDelay()方法:

1 // 优先队列中的元素
2 type PqItem struct {
3 Item interface{} // 存放元素
4 Priority int64 // 优先级,在此以秒为单位的timestamp
5 }
6
7 func (i *PqItem) getDelay() int64 {
8 return i.Priority - time.Now().Unix()
9 }
10
11 // 优先队列,需要实现Len, Less, Swap, Push, Pop接口。同时为了方便获取队头元素,新增Peek方法
12 type PriorityQueue []*PqItem
13
14 func (pq *PriorityQueue) Len() int {
15 return len(*pq)
16 }
17
18 func (pq *PriorityQueue) Less(i, j int) bool {
19 return (*pq)[i].Priority <(*pq)[j].Priority
20 }
21
22 func (pq *PriorityQueue) Swap(i, j int) {
23 (*pq)[i], (*pq)[j] = (*pq)[j], (*pq)[i]
24 }
25
26 func (pq *PriorityQueue) Push(e interface{}) {
27 item, ok := e.(*PqItem)
28 if !ok {
29 return
30 }
31 *pq = append(*pq, item)
32 }
33
34 func (pq *PriorityQueue) Pop() interface{} {
35 n := len(*pq)
36 e := (*pq)[n-1]
37 *pq = (*pq)[0 : n-1]
38 return e
39 }
40
41 // 新增Peek和Clear方法
42 func (pq *PriorityQueue) Peek() *PqItem {
43 return (*pq)[0]
44 }
45
46 // Clear方法主要用在关闭队列时,清空队列元素,保证收发能收到队列关闭的信号
47 func (pq *PriorityQueue) Clear() {
48 *pq = (*pq)[0:0]
49 }

View Code

 以上就是基于golang heap包实现的PriorityQueue,使用时:

// 初始化:
pq := PriorityQueue{
// 填充好元素
}
heap.Init(
&pq)
// 入队
heap.Push(&pq, &pqItem)
// 出队
heap.Pop(&pq)

3.2.2 获取goroutine ID

  golang中,没有与java Thread.currentThread()等价的函数,因此需要自己封装,用goroutine ID来唯一标记一个goroutine。网上有大量的goroutine ID获取方法,也可以参考3.8 例子:Goroutine ID · Go语言高级编程。本博客选取了实现较简单,性能较差的版本,如下:

func GetGId() int {
var buf [
64]byte
n :
= runtime.Stack(buf[:], false)
idField :
= strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err :
= strconv.Atoi(idField)
if err != nil {
panic(fmt.Sprintf(
"cannot get goroutine id: %v", err))
}
return id
}

3.2.3 实现DelayQueue

  在此主要演示DelayQueue的Take和Offer方法。

1 type DelayQueue struct {
2 pq PriorityQueue
3 mutex *sync.Mutex
4 available chan bool
5 leader int // 存放goroutine ID
6 closed bool // 标记available是否关闭
7 }
8
9 /* 带初始元素的DelayQueue初始化
10 * mems: 数据; delays: mems中的数据对应的延时,单位ms
11 */
12 func NewDelayQueue(mems []interface{}, delays []int64) (*DelayQueue, error) {
13 if len(delays) != len(mems) {
14 return nil, errors.New("len of delays isn't equal to len of mems")
15 }
16 pq := make(PriorityQueue, len(mems))
17 for idx, mem := range mems {
18 pq[idx] = &PqItem{
19 Priority: time.Now().Unix() + delays[idx],
20 Item: mem,
21 }
22 }
23 heap.Init(&pq) // PriorityQueue初始化
24
25 dq := &DelayQueue{
26 pq: pq,
27 mutex: new(sync.Mutex),
28 available: make(chan bool, 1),
29 leader: -1,
30 closed: false,
31 }
32 return dq, nil
33 }
34
35 // 入队, 本质上就是调用优先队列的入队操作
36 func (dq *DelayQueue) Offer(e interface{}, delay int64) (closed bool) {
37 dq.mutex.Lock()
38 defer func() {
39 // 用来捕获chanel close的异常,对于发送端来说,往一个close的channel发送数据会panic,只有通过recover才能知道available已经关闭
40 if err := recover(); err != nil {
41 dq.closed = true
42 closed = true
43 }
44 dq.mutex.Unlock()
45 }()
46 // 先判断queue是否关闭,关闭直接返回
47 if dq.closed {
48 return dq.closed
49 }
50 item := &PqItem{
51 Item: e,
52 Priority: time.Now().Unix() + delay,
53 }
54 heap.Push(&(dq.pq), item) // 放入优先队列
55 if dq.pq.Peek() == item {
56 // 注:由于不是每次入队都发送available,导致如果available关闭,发送端也不能及时检测到队列关闭;在close之前必须先清空available队列的元素。
57 dq.leader = -1 // very important
58 select {
59 case dq.available <- true:
60 return closed
61 default:
62 return closed
63 }
64 }
65 return closed
66 }
67
68 // 出队
69 func (dq *DelayQueue) Take() (e interface{}, err error, closed bool) {
70 dq.mutex.Lock()
71 defer func() {
72 if dq.leader == -1 && dq.pq.Len() > 0 && !closed {
73 // 一定要判断队列是否关闭,关闭了剩余的历史数据无法获取
74 select {
75 case dq.available <- true:
76 default:
77 }
78 }
79 dq.mutex.Unlock()
80 }()
81 if dq.closed {
82 return nil, nil, dq.closed
83 }
84 for {
85 for dq.pq.Len() == 0 {
86 // 无数据
87 dq.mutex.Unlock()
88 _, ok := <-dq.available
89 dq.mutex.Lock() // 先上锁,再判断是不是关闭
90 if !ok {
91 // 队列已经关闭
92 return nil, nil, true
93 }
94 }
95 first := dq.pq.Peek()
96 delay := first.getDelay()
97 if delay <= 0 {
98 PqItem := heap.Pop(&(dq.pq)).(*PqItem)
99 return PqItem.Item, nil, false
100 } else {
101 // 没到到期时间,需要等待
102 first = nil // 释放引用
103 gId := GetGId()
104 if dq.leader != -1 {
105 // 前面有等待的goroutine,此goroutine休眠,
106 dq.mutex.Unlock()
107 _, ok := <-dq.available
108 dq.mutex.Lock()
109 if !ok {
110 return nil, nil, true
111 }
112 } else {
113 // 前面没有等待的goroutine, 设置当前的goroutine进入定时等待
114 dq.leader = gId
115 dq.mutex.Unlock()
116 select {
117 case <-dq.available:
118 case <-time.After(time.Second * time.Duration(delay)):
119 }
120 dq.mutex.Lock()
121 if dq.leader == gId {
122 dq.leader = -1
123 }
124 }
125 }
126 }
127 return nil, nil, false
128 }
129
130 // 最后再实现一个Exit
131 func (dq *DelayQueue) Exit() {
132 dq.mutex.Lock()
133 dq.pq.Clear() // 清空队列
134 close(dq.available) // 关闭channel
135 dq.mutex.Unlock()
136 }

View Code


3.2.4 测试

  编写一个consumer和producer:

1 func DQProducer(dq *DelayQueue) {
2 time.Sleep(time.Second * 14)
3 num := []int{87, 59, 18, 40, 0, 11, 89, 74}
4 delay := []int{1, 7, 3, 5, 6, 4, 2, 8}
5 for i := 0; i {
6 if closed := dq.Offer(num[i], int64(delay[i])); closed {
7 break
8 }
9 //fmt.Printf("add num: %d, delay: %d\n", val, delay)
10 time.Sleep(time.Second)
11 }
12 }
13
14 func DQConsumer(dq *DelayQueue) {
15 gId := GetGId()
16 for {
17 item, err, closed := dq.Take()
18 if err != nil || closed {
19 return
20 }
21 fmt.Printf("consumer %d get value: %v, timestamp: %d\n", gId, item, time.Now().Unix())
22 }
23 }
24
25 func main() {
26 mems := []interface{}{4, 3, 7, 3, 43, 13, 23, 53}
27 delays := []int64{1, 3, 4, 7, 10, 8, 13, 12}
28 dq, _ := NewDelayQueue(mems, delays)
29 for i := 0; i <50; i++ {
30 go DQConsumer(dq, i+1)
31 }
32 go DQProducer(dq)
33 //go DQProducer(dq)
34 //go DQProducer(dq)
35 time.Sleep(time.Second * 30)
36 dq.Exit()
37 fmt.Println("queue has been closed")
38 time.Sleep(time.Second * 30)
39 }

View Code

 

首先DQProducer先sleep 14s,保证初始化的元素先消费完,顺便演示一下正常情况下的consumer情况。然后Producer每隔1s生产一个元素。执行结果如下:

// 正常消费
consumer 18 get value: 4, timestamp: 1653828071
consumer
23 get value: 3, timestamp: 1653828073
consumer
21 get value: 7, timestamp: 1653828074
consumer
22 get value: 3, timestamp: 1653828078
consumer
19 get value: 13, timestamp: 1653828078
consumer
39 get value: 43, timestamp: 1653828080
consumer
24 get value: 53, timestamp: 1653828082
consumer
25 get value: 23, timestamp: 1653828083
consumer
26 get value: 87, timestamp: 1653828085
consumer
28 get value: 18, timestamp: 1653828089
consumer
27 get value: 59, timestamp: 1653828092
consumer
27 get value: 40, timestamp: 1653828092
consumer
29 get value: 89, timestamp: 1653828094 // 89在11之后入队,虽然delay一样,理论上应该是11 在前, 可能原因是终端打印顺序问题
consumer 20 get value: 11, timestamp: 1653828094
consumer
20 get value: 0, timestamp: 1653828094
consumer
35 get value: 74, timestamp: 1653828100
queue has been closed

3.3 DelayQueue总结

  上面的分析与实现,基本覆盖了DelayQueue的所有要点,在此总结一下:



  • golang不存在goroutine休眠自动释放锁,所以在进入休眠之前一定要手动释放锁;重启之后再手动加锁;

  • golang的channel阻塞读写和非阻塞读写一定要分开(某些场景下是阻塞的,某些是非阻塞的);

  • leader的设置尤其重要(分析见3.1)

以上实现非常简单粗暴,基本参考java的实现方式,存在频繁的加锁解锁,以及获取goroutine ID的函数性能较差,仅能学习使用。goroutine之间的同步,既要考虑channel的关闭,阻塞和非阻塞,也要频繁的在阻塞前后释放锁和加锁,实现起来非常容易出错。想要实现一个应用级的DelayQueue,还有很多优化。由于golang和java的很多特性也不一样,可能也有完全不一样的golang实现版本。



推荐阅读
author-avatar
女女的家_747
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有