在日常开发中,队列是常用到的数据结构,BlockQueue、PriorityQueue、DelayQueue更是其中的重点。BlockQueue即阻塞队列,当队列满时,继续发送会阻塞;当队列为空时,继续接收会阻塞,是典型的生产-消费类型。PriorityQueue基于MaxHeap/MinHeap,保证队头的元素始终是最大值/最小值,在求max(n)(在m个数中找到最大/小的n个元素,其中m>>n)问题中具有良好的性能。DelayQueue则是延时队列,在定时任务中有重要的场景。此篇博客的契机是笔者在看kafka相关资料时,其定时/延时任务都基于TimeWheel算法(定时/延时调度常用的算法)。在kafka的TimeWheel中,将DelayQueue和TimeWheel结合起来,解决了DelayQueue插入/删除复杂度高和TimeWheel大量无效推进的问题。于是想深入了解一下此实现方式。刚好对DelayQueue不是很熟,因此重新温习了一下相关的重要数据结构。结合笔者目前的主语言golang,实现了golang实验版本的对应数据结构。
BlockQueue即带有阻塞性质的队列。当队列为空时,读取会阻塞;当队列满时,写入会阻塞。底层存储可以基于数组或链表实现。对此java有对应的ArrayBlockingQueue和LinkedBlockingQueue实现。该队列的关键是阻塞的实现方式,以ArrayBlockingQueue为例,在java中,通过条件Condition实现,核心的实现如下:
class ArrayBlockingQueue {
final Object[] items; // 数组,保存队列的元素
// 读取数据数组下标
int takeIndex;
// 插入数据数组下标
int putIndex;
// 队列中数组元素个数
int count;
// 并发访问控制锁
final ReentrantLock lock;
// 条件队列
private final Condition notEmpty; // 用于实现阻塞和同志
private final Condition notFull;
// 阻塞入队put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 当队列满时,通过await挂起线程
enqueue(e); // 入队,内部会通过notEmpty.signal()通知take()阻塞的线程
} finally {
lock.unlock();
}
}
// 阻塞出队take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 当前队列空,让出资源等待
while (count == 0)
notEmpty.await(); // 当队列为空时,通过await挂起线程
return dequeue(); // 出队时,通过notFull.signal()通知put()阻塞的线程
} finally {
lock.unlock();
}
}
}
View Code
可见,实现阻塞队列的核心要点如下:
golang中与线程对应的是协程(goroutine)。但是协程的启停通常不由用户代码控制(阻塞调用gopark ,启用可以通过schedule()让golang重新开启一次调度,以上函数均不能被外部包调用),协程阻塞通常分为以下几种场景:
当goroutine阻塞时,golang的调度器就会将此goroutine切出当前运行的线程,换一个可执行的goroutine继续在当前线程中运行。由于阻塞和启用操作比较底层,所以想利用java 的方式实现BlockQueue,操作会比较复杂。
golang的channel,本身就具有队列的属性,同时其具有读写阻塞,并发安全的特性,是天然的BlockQueue。唯一的不足就是java的ArrayBlockingQueue中包含两个方法:element和peek,其作用是返回队头的元素,但是不取出来。想要实现这两个函数,用golang的channel就有点捉襟见肘,必须先遍历读取channel所有元素,然后再写回去,实现复杂度为O(n)(n为队列大小)。因此如果想要实现这两个函数的功能,就必须抛弃channel,按照java的方式去实现,底层仍然采用数组存储,那么如何阻塞并唤醒goroutine呢?答案还是channel(可以发现,想要实现goroutine之间的交互,channel是一个绕不过去的坎)。之前谈到goroutine阻塞时,有一种情况:读写channel阻塞。当读写cahnnel造成goroutine阻塞时,调度器会将阻塞的goroutine换出线程;而读写channel阻塞条件解除时,阻塞的goroutine又会重新唤醒。因此,只需要将java中ArrayBlockingQueue.Condition换成channel,就能实现goroutine之间的同步。
还有一个比较大的问题是,对于线程级别切换的语言,比如C++,java,在线程阻塞处于等待并切出CPU时,锁会自动释放;当线程重新获取到CPU使用权进入运行态时,会重新获取锁。但是goroutine的切换,锁不会自动释放。如果在goroutine阻塞之前不手动释放锁,那么阻塞的goroutine就会一直持有锁;导致其他goroutine没法向队列里添加或获取元素,持有锁等待channel消息而阻塞的goroutine就会死锁。
注意到了以上两个问题,实现代码就比较容易了,以下是相关代码的实现。
// Queue接口
type Queue interface {
Add(e interface{}) error
Offer(e interface{}) bool
Remove() (e interface{}, err error)
Poll() (e interface{}, getE bool)
Element() (e interface{}, err error)
Peek() (e interface{}, getE bool)
}
// BlockingQueue接口
type BlockingQueue interface {
Queue
Put(e interface{}) (closed bool) // closed状态可以标记队列是否关闭,用于当队列不可用时,能从阻塞中返回
Take() (e interface{}, closed bool) // closed状态可以标记队列是否关闭,用于当队列不可用时,能从阻塞中返回
OfferWithTimeout(e interface{}, timeout time.Duration) (offerd bool, closed bool)
PollWithTimeout(timeout time.Duration) (e interface{}, err error, closed bool)
}
// ArrayBlockingQueue及其实现
type ArrayBlockingQueue struct {
objects []interface{} // 存储元素
takIdx int
putIdx int
count int
mutex *sync.Mutex // 保护临界区资源objects
notEmpty chan bool // 通知队列不为空的channel
notFull chan bool // 通知队列没满的channel
}
// ArrayBlockingQueue初始化
func NewArrayBlockingQueue(cap int32) (*ArrayBlockingQueue, error) {
if cap <= 0 {
return nil, fmt.Errorf("cap of a queue should be greater than 0, got: %d", cap)
}
q := new(ArrayBlockingQueue)
q.objects = make([]interface{}, cap)
q.notEmpty = make(chan bool, 1)
q.notFull = make(chan bool, 1)
q.mutex = new(sync.Mutex)
return q, nil
}
// 解除阻塞,由发送方调用,调用Put函数的一方
func (q *ArrayBlockingQueue) DestroyFromSender() {
close(q.notEmpty)
}
// 解除阻塞,由接收方调用,调用Take函数的一方
func (q *ArrayBlockingQueue) DestroyFromReceiver() {
close(q.notFull)
}
// 入队,仅供ArrayBlockingQueue其他函数调用,在调用此函数前,已经加锁保证了临界区资源访问的安全性
func (q *ArrayBlockingQueue) enqueue(e interface{}) {
q.objects[q.putIdx] = e
q.putIdx += 1
if q.putIdx == len(q.objects) {
q.putIdx = 0
}
q.count++
select {
case q.notEmpty <- true: // 此处一定是非阻塞式发送非空的信号
default:
return
}
}
// 出队,仅供ArrayBlockingQueue其他函数调用
func (q *ArrayBlockingQueue) dequeue() (e interface{}) {
e = q.objects[q.takIdx]
q.objects[q.takIdx] = nil
q.takIdx += 1
if q.takIdx == len(q.objects) {
q.takIdx = 0
}
q.count -= 1
select {
case q.notFull <- true: // 此处一定是非阻塞式发送非空的信号
default:
return e
}
return e
}
// Queue接口的相关功能都比较容易实现,限于篇幅不在此展示
// 阻塞式接口
//阻塞式Put
func (q *ArrayBlockingQueue) Put(e interface{}) (closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == len(q.objects) {
q.mutex.Unlock() // 阻塞之前一定要释放锁,否则就会死锁
_, ok := <-q.notFull
if !ok {
// queue closed
q.mutex.Lock() // 判断队列是否已经关闭,关闭,则需要重新上锁,以便defer q.mutex.Unlock()解锁时不会报错
return true // 队列已关闭
}
q.mutex.Lock() // 阻塞解除时,重新上锁,再次判断队列是否已满
}
q.enqueue(e)
return false
}
//阻塞式Take
func (q *ArrayBlockingQueue) Take() (e interface{}, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == 0 {
q.mutex.Unlock() // 阻塞之前先解锁
if _, ok := <-q.notEmpty; !ok {
// queue closed
q.mutex.Lock() // 判断队列是否已经关闭,关闭,则需要重新上锁,以便defer q.mutex.Unlock()解锁时不会报错
return e, true
}
q.mutex.Lock() // 阻塞解除重新上锁
}
return q.dequeue(), false
}
// 对于带timeout的take和put,实现如下
func (q *ArrayBlockingQueue) OfferWithTimeout(e interface{}, timeout time.Duration) (offered bool, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == len(q.objects) {
q.mutex.Unlock()
select {
case _, ok := <-q.notFull:
if !ok {
// queue closed
q.mutex.Lock()
return false, true
}
case <-time.After(timeout): // 在阻塞的基础上,增加了超时机制
q.mutex.Lock()
return false, false
}
q.mutex.Lock()
}
q.enqueue(e)
return true, false
}
func (q *ArrayBlockingQueue) PollWithTimeout(timeout time.Duration) (e interface{}, err error, closed bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count == 0 {
q.mutex.Unlock()
select {
case _, ok := <-q.notEmpty:
if !ok {
// queue closed
q.mutex.Lock()
return e, nil, true
}
case <-time.After(timeout): // 增加了超时机制
q.mutex.Lock()
return e, errors.New("queue is empty"), false
}
q.mutex.Lock()
}
return q.dequeue(), nil, false
}
View Code
总结一下ArrayBlockingQueue的实现要点:
下面是一个简单的示例,演示如何使用ArrayBlockingQueue:
func BlockConsumer(q *ArrayBlockingQueue, wg *sync.WaitGroup) {
for i := 0; i <100; i++ {
val, closed := q.Take()
if closed {
break
}
time.Sleep(time.Second * 1)
fmt.Printf("q take succeed, val: %v, serial num: %d\n", val, i)
}
wg.Done()
}
func BlockProducer(q *ArrayBlockingQueue, wg *sync.WaitGroup) {
for i := 0; i <100; i++ {
if closed := q.Put("put_" + strconv.Itoa(i)); closed {
break
}
fmt.Println("q put succeed, serial num: ", i)
}
fmt.Println("BlockProducer finished")
q.DestroyFromSender()
wg.Done()
}
func main() {
q, _ := NewArrayBlockingQueue(2)
wg := new(sync.WaitGroup)
wg.Add(4)
// 一个生产者
go BlockProducer(q, wg)
/// 三个消费者
go BlockConsumer(q, wg)
go BlockConsumer(q, wg)
go BlockConsumer(q, wg)
wg.Wait()
}
View Code
PriorityQueue底层依赖于一颗完全二叉树构造的最大堆(max_heap)或最小堆(min_heap),最大堆和最小堆有以下属性:
由于PriorityQueue底层是完全二叉树,因此对于一个序号为i的节点,左孩子节点的序号为2i,右孩子节点的序号为2i+1,因此一般用数组来存储(数组序号为0的位置不存储任何元素)。同时根节点的元素始终是最大值(max_heap)或最小值(min_heap),这样每次从PriorityQueue取数时,都是取根节点的元素,取出来的值始终有序,这也是PriorityQueue如此命名的原因。
在我之前的文章里,有详细介绍max_heap和min_heap的性质和相关函数实现:max_heap与min_heap ,在此不再赘述。由于PriorityQueue的实现不涉及到编程语言的特性,在此也不再附上go代码实现,感兴趣的读者可以自行依照原理去实现。
PriorityQueue在Top K问题上有广泛的运用,感兴趣的同学,可以练习一下Leetcode上PriorityQueue相关的题目,如下:
DelayQueue是一个延时队列,队头的元素始终是到期时间最早的。获取元素时,首先判断队头元素的delay是否小于等于0;当小于等于0时,才能取出;否则阻塞直到delay小于等于0。DelayQueue的特性如下:
因此,DelayQueue是PriorityQueue+BlockQueue的组合,广泛用于延时任务中。
我们首先来看一下java的实现方式。由于DelayQueue是根据元素延时时间确定出队顺序的,因此要求每个元素实现了Delayed接口,包含getDelay(TimeUnit.NANOSECONDS)方法,用于获取剩余的延时时间。DelayQueue的核心数据结构如下:
class DelayQueue { DelayQueue的内部数据跟BlockQueue整体差不多,也比较好理解。然后是DelayQueue的两个核心方法: 1 // 存入元素 consumer1调用take()时,队列没有元素。这个时候consumer1线程直接阻塞,leader=null --> producer1调用offer(),放入元素element1,delay=d1, 通过available.signal()唤醒consumer1 --> consumer1被唤醒,发现队列队头有元素element1且其dealy=d1>0,此时leader=consumer1, consumer1定时等待d1时长 --> 此时,又增加了若干消费者consumer_i(i=2,3,4..),依照源码,leader != null, 其他若干consumer都会无限时长阻塞 --> 然后又进来了一个元素element2,该元素的delay=d2比当前队头元素小(d1>d2>0),因此会调用available.signal(),那么可能唤醒: 综上,可以看出leader变量的重要性,保证队头元素的一定是定时等待,能按时唤醒消费线程。 DelayQueue依赖PriorityQueue,因此需要先实现一个PriorityQueue。基于golang的heap实现PriorityQueue,其中的元素需要有getDelay()方法: 1 // 优先队列中的元素 View Code // 初始化: golang中,没有与java Thread.currentThread()等价的函数,因此需要自己封装,用goroutine ID来唯一标记一个goroutine。网上有大量的goroutine ID获取方法,也可以参考3.8 例子:Goroutine ID · Go语言高级编程。本博客选取了实现较简单,性能较差的版本,如下: func GetGId() int { 在此主要演示DelayQueue的Take和Offer方法。 1 type DelayQueue struct { View Code 编写一个consumer和producer: 1 func DQProducer(dq *DelayQueue) { View Code // 正常消费 上面的分析与实现,基本覆盖了DelayQueue的所有要点,在此总结一下: 以上实现非常简单粗暴,基本参考java的实现方式,存在频繁的加锁解锁,以及获取goroutine ID的函数性能较差,仅能学习使用。goroutine之间的同步,既要考虑channel的关闭,阻塞和非阻塞,也要频繁的在阻塞前后释放锁和加锁,实现起来非常容易出错。想要实现一个应用级的DelayQueue,还有很多优化。由于golang和java的很多特性也不一样,可能也有完全不一样的golang实现版本。
private final transient ReentrantLock lock;
private final PriorityQueue
private Thread leader = null; // 最先消费的等待线程
private final Condition available; // 可用条件
...
}
2 public boolean offer(E e) {
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 q.offer(e); // 优先队列入队,非阻塞的
7 if (q.peek() == e) {
8 // 说明入队的元素delay时间最小,应当唤醒线程
9 leader = null; // important
10 available.signal(); // 唤醒其他等待的线程
11 }
12 return true
13 } finally {
14 lock.unlock();
15 }
16 }
17
18 // 阻塞获取元素
19 public E take() throws InterruptedException {
20 final ReentrantLock lock = this.lock;
21 lock.lockInterruptibly(); // 上锁
22 try {
23 // 阻塞式获取元素
24 for(;;) {
25 // 查看队头元素是否延时到期
26 E first = q.peek();
27 if (first == null)
28 available.await(); // 队头元素为空,说明没元素,等待
29 else {
30 long delay = first.getDelay(NANOSECONDS) // 获取延时
31 if (delay <= 0)
32 return q.poll() // 元素到期,直接返回
33 else {
34 // 需要等待
35 first = null; // 释放队头元素引用,方便其他线程获取元素并释放内存
36 // 以下部分是DelayQueue的精髓部分
37 if (leader != null)
38 // 说明已经有定时等待线程,只需要阻塞即可
39 available.await()
40 else {
41 // 说明没有定时等待线程,那么当前线程设置为定时等待线程,等待的时间为delay,保证延时结束能第一时间消费
42 Thread thisThread = Thread.currentThread();
43 leader = thisThread;
44 try {
45 available.awaitNanos(delay); // 定时等待
46 } finally {
47 if (leader == thisThread)
48 leader = null // 重置定时等待线程
49 }
50 }
51 }
52 }
53 }
54 } finally {
55 // 如果 leader 为空,且队列不为空,唤醒一个消费线程
56 if (leader == null && q.peek() != null)
57 available.signal();
58 lock.unlock();
59 }
60 }
整个流程比较复杂,在此引用【Java多线程】DelayQueue源码分析 (二十六) - H__D - 博客园总结的一张图,对整个过程进行总结:
DelayQueue最难理解的部分是其leader成员变量的设置。假设以下场景:
(1)leader线程,leader线程重置leader=null,然后查看队头元素的delay,重置该delay;
(2)非leader线程,此时 队头元素delay>0,同时由于入队时在available.signal()之前,重置了leader=null;所以此时被唤醒的线程会被设置成定时线程,定时时长为d1。保证了队头元素能按时唤醒。3.2 golang实现DelayQueue
3.2.1 实现PriorityQueue
2 type PqItem struct {
3 Item interface{} // 存放元素
4 Priority int64 // 优先级,在此以秒为单位的timestamp
5 }
6
7 func (i *PqItem) getDelay() int64 {
8 return i.Priority - time.Now().Unix()
9 }
10
11 // 优先队列,需要实现Len, Less, Swap, Push, Pop接口。同时为了方便获取队头元素,新增Peek方法
12 type PriorityQueue []*PqItem
13
14 func (pq *PriorityQueue) Len() int {
15 return len(*pq)
16 }
17
18 func (pq *PriorityQueue) Less(i, j int) bool {
19 return (*pq)[i].Priority <(*pq)[j].Priority
20 }
21
22 func (pq *PriorityQueue) Swap(i, j int) {
23 (*pq)[i], (*pq)[j] = (*pq)[j], (*pq)[i]
24 }
25
26 func (pq *PriorityQueue) Push(e interface{}) {
27 item, ok := e.(*PqItem)
28 if !ok {
29 return
30 }
31 *pq = append(*pq, item)
32 }
33
34 func (pq *PriorityQueue) Pop() interface{} {
35 n := len(*pq)
36 e := (*pq)[n-1]
37 *pq = (*pq)[0 : n-1]
38 return e
39 }
40
41 // 新增Peek和Clear方法
42 func (pq *PriorityQueue) Peek() *PqItem {
43 return (*pq)[0]
44 }
45
46 // Clear方法主要用在关闭队列时,清空队列元素,保证收发能收到队列关闭的信号
47 func (pq *PriorityQueue) Clear() {
48 *pq = (*pq)[0:0]
49 }
以上就是基于golang heap包实现的PriorityQueue,使用时:
pq := PriorityQueue{
// 填充好元素
}
heap.Init(&pq)
// 入队
heap.Push(&pq, &pqItem)
// 出队
heap.Pop(&pq)3.2.2 获取goroutine ID
var buf [64]byte
n := runtime.Stack(buf[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err := strconv.Atoi(idField)
if err != nil {
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
}
return id
}3.2.3 实现DelayQueue
2 pq PriorityQueue
3 mutex *sync.Mutex
4 available chan bool
5 leader int // 存放goroutine ID
6 closed bool // 标记available是否关闭
7 }
8
9 /* 带初始元素的DelayQueue初始化
10 * mems: 数据; delays: mems中的数据对应的延时,单位ms
11 */
12 func NewDelayQueue(mems []interface{}, delays []int64) (*DelayQueue, error) {
13 if len(delays) != len(mems) {
14 return nil, errors.New("len of delays isn't equal to len of mems")
15 }
16 pq := make(PriorityQueue, len(mems))
17 for idx, mem := range mems {
18 pq[idx] = &PqItem{
19 Priority: time.Now().Unix() + delays[idx],
20 Item: mem,
21 }
22 }
23 heap.Init(&pq) // PriorityQueue初始化
24
25 dq := &DelayQueue{
26 pq: pq,
27 mutex: new(sync.Mutex),
28 available: make(chan bool, 1),
29 leader: -1,
30 closed: false,
31 }
32 return dq, nil
33 }
34
35 // 入队, 本质上就是调用优先队列的入队操作
36 func (dq *DelayQueue) Offer(e interface{}, delay int64) (closed bool) {
37 dq.mutex.Lock()
38 defer func() {
39 // 用来捕获chanel close的异常,对于发送端来说,往一个close的channel发送数据会panic,只有通过recover才能知道available已经关闭
40 if err := recover(); err != nil {
41 dq.closed = true
42 closed = true
43 }
44 dq.mutex.Unlock()
45 }()
46 // 先判断queue是否关闭,关闭直接返回
47 if dq.closed {
48 return dq.closed
49 }
50 item := &PqItem{
51 Item: e,
52 Priority: time.Now().Unix() + delay,
53 }
54 heap.Push(&(dq.pq), item) // 放入优先队列
55 if dq.pq.Peek() == item {
56 // 注:由于不是每次入队都发送available,导致如果available关闭,发送端也不能及时检测到队列关闭;在close之前必须先清空available队列的元素。
57 dq.leader = -1 // very important
58 select {
59 case dq.available <- true:
60 return closed
61 default:
62 return closed
63 }
64 }
65 return closed
66 }
67
68 // 出队
69 func (dq *DelayQueue) Take() (e interface{}, err error, closed bool) {
70 dq.mutex.Lock()
71 defer func() {
72 if dq.leader == -1 && dq.pq.Len() > 0 && !closed {
73 // 一定要判断队列是否关闭,关闭了剩余的历史数据无法获取
74 select {
75 case dq.available <- true:
76 default:
77 }
78 }
79 dq.mutex.Unlock()
80 }()
81 if dq.closed {
82 return nil, nil, dq.closed
83 }
84 for {
85 for dq.pq.Len() == 0 {
86 // 无数据
87 dq.mutex.Unlock()
88 _, ok := <-dq.available
89 dq.mutex.Lock() // 先上锁,再判断是不是关闭
90 if !ok {
91 // 队列已经关闭
92 return nil, nil, true
93 }
94 }
95 first := dq.pq.Peek()
96 delay := first.getDelay()
97 if delay <= 0 {
98 PqItem := heap.Pop(&(dq.pq)).(*PqItem)
99 return PqItem.Item, nil, false
100 } else {
101 // 没到到期时间,需要等待
102 first = nil // 释放引用
103 gId := GetGId()
104 if dq.leader != -1 {
105 // 前面有等待的goroutine,此goroutine休眠,
106 dq.mutex.Unlock()
107 _, ok := <-dq.available
108 dq.mutex.Lock()
109 if !ok {
110 return nil, nil, true
111 }
112 } else {
113 // 前面没有等待的goroutine, 设置当前的goroutine进入定时等待
114 dq.leader = gId
115 dq.mutex.Unlock()
116 select {
117 case <-dq.available:
118 case <-time.After(time.Second * time.Duration(delay)):
119 }
120 dq.mutex.Lock()
121 if dq.leader == gId {
122 dq.leader = -1
123 }
124 }
125 }
126 }
127 return nil, nil, false
128 }
129
130 // 最后再实现一个Exit
131 func (dq *DelayQueue) Exit() {
132 dq.mutex.Lock()
133 dq.pq.Clear() // 清空队列
134 close(dq.available) // 关闭channel
135 dq.mutex.Unlock()
136 }3.2.4 测试
2 time.Sleep(time.Second * 14)
3 num := []int{87, 59, 18, 40, 0, 11, 89, 74}
4 delay := []int{1, 7, 3, 5, 6, 4, 2, 8}
5 for i := 0; i
6 if closed := dq.Offer(num[i], int64(delay[i])); closed {
7 break
8 }
9 //fmt.Printf("add num: %d, delay: %d\n", val, delay)
10 time.Sleep(time.Second)
11 }
12 }
13
14 func DQConsumer(dq *DelayQueue) {
15 gId := GetGId()
16 for {
17 item, err, closed := dq.Take()
18 if err != nil || closed {
19 return
20 }
21 fmt.Printf("consumer %d get value: %v, timestamp: %d\n", gId, item, time.Now().Unix())
22 }
23 }
24
25 func main() {
26 mems := []interface{}{4, 3, 7, 3, 43, 13, 23, 53}
27 delays := []int64{1, 3, 4, 7, 10, 8, 13, 12}
28 dq, _ := NewDelayQueue(mems, delays)
29 for i := 0; i <50; i++ {
30 go DQConsumer(dq, i+1)
31 }
32 go DQProducer(dq)
33 //go DQProducer(dq)
34 //go DQProducer(dq)
35 time.Sleep(time.Second * 30)
36 dq.Exit()
37 fmt.Println("queue has been closed")
38 time.Sleep(time.Second * 30)
39 }
首先DQProducer先sleep 14s,保证初始化的元素先消费完,顺便演示一下正常情况下的consumer情况。然后Producer每隔1s生产一个元素。执行结果如下:
consumer 18 get value: 4, timestamp: 1653828071
consumer 23 get value: 3, timestamp: 1653828073
consumer 21 get value: 7, timestamp: 1653828074
consumer 22 get value: 3, timestamp: 1653828078
consumer 19 get value: 13, timestamp: 1653828078
consumer 39 get value: 43, timestamp: 1653828080
consumer 24 get value: 53, timestamp: 1653828082
consumer 25 get value: 23, timestamp: 1653828083
consumer 26 get value: 87, timestamp: 1653828085
consumer 28 get value: 18, timestamp: 1653828089
consumer 27 get value: 59, timestamp: 1653828092
consumer 27 get value: 40, timestamp: 1653828092
consumer 29 get value: 89, timestamp: 1653828094 // 89在11之后入队,虽然delay一样,理论上应该是11 在前, 可能原因是终端打印顺序问题
consumer 20 get value: 11, timestamp: 1653828094
consumer 20 get value: 0, timestamp: 1653828094
consumer 35 get value: 74, timestamp: 1653828100
queue has been closed3.3 DelayQueue总结