在上一篇介绍中我们实现了从消息产生程序发送简单消息到命名队列的过程,本篇将实现一个将耗时任务分发到多个消费者程序的工作队列。
工作队列的主要思想是避免对资源密集型任务处理时的等待,而是先将任务压入队列,后期再进行计划处理。我们将任务封装成消息发送给队列,由队列程序按策略分发到所有的在线工作者程序执行。当有多个工作程序同时在线时,多项任务同时被多个不同的工作者处理便成为可能。
在Web应用程序领域,如果需要在一个HTTP短连接中完成一些复杂的耗时任务时,工作队列的思想能大幅提高处理效率而带来了更好的用户体验。
准备
在上一篇介绍中我们发送一个固定的字符串“Hello world"到队列,然后在接收程序中打印出来。这里没有实现诸如图片大小的调整、PDF文件的渲染等真实的复杂任务,而是用特定字符串来表示复杂任务,导致消息处理程序忙碌。任务处理程序通过time.Sleep函数让线程睡眠来模拟复杂度,以一连串的字符"."来表示任务的复杂度,每一个点表示停顿1秒钟,如"Hello..."表示任务耗时3秒钟。
还是在之前例子的send.go文件上进行修改,让程序通过命令行将任意个消息参数传递到队列,姑且将新文件命名为new_task.go:
body := bodyFrom(os.Args)
err = ch.Publish(
"", //exchange
q.Name, //routing key
false, //mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
同样的,receive.go文件也需要进行修改,根据消息体中"."的个数来模拟任务的耗时长度。该文件的任务还是从队列中取出一个任务并执行,我们姑且称之为work.go:
msgs, err := ch.Consume(
q.Name, //queue
"", //consumer
true, //auto-ack
false, //exclusive
false, //no-local
false, //no-wait
nil, //args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t* time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for message. To exit press CTRL+C")
<-forever
好了,到此为止,我们先来看看上述的模拟是否成功:
先运行worker.go
#shell 1
go run worker.go
然后另起一个终端运行new_task.go
#shell 2
go run new_task.go ray..
上述命令执行完后,继续测试:
#shell 2
go run new_task.go ray....
第一次new_task.go ray..有两个点,Done的输出间隔是2秒,ray.....,输出间隔是5秒。那么恭喜,到目前为止,我们一切顺利。
轮询调度(Round-robin dispatching)
工作队列的优势是能轻松处理多个积压的任务,如果有一个已经堆满的任务队列待处理,只需添加多个消费者,这些消费者便都能对队列进行消耗。
首先,想象一下如果同时运行两个worker.go脚本,当生产者不断发送消息到队列时,会出现什么情况?
我们需要三个终端来运行这个小例子,两个运行worker.go,我们将他们看成两个消费者C1和C2.
#shell 1
go run worker.go
#shell 2
go run worker.go
然后在第三个终端中,发送消息到队列,你可以尝试多次,如下:
#shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
...
因为实在另外机器上的缘故,本帖不贴该结果。
默认地,RabbitMQ会按收到的消息顺序依次发送到每一个消费者中,从总体上来看,每个消费者会收到同样多的消息。这种消息分发方式叫做round-robin(轮询调度).
消息确认
当处理一个长耗时任务时,任务处理程序(消费者)可能由于某种原因以外崩溃,那么此时会发生什么事情呢?在我们目前的代码中,一旦RabbitMQ将消息发送到消费者时就会将其标记并删除,而不会去关心消费者程序是否执行完毕。因此在这种情形下,如果你关闭了一个正在处理某项任务的消费者时,会导致其正在处理的及已分发给它却还没来得及处理的任务丢失。
然而在很多真实情况下,我们并不希望丢失掉任何一条消息,如订单信息、支付信息等。当某一消费者突然崩溃后,我们希望将其未处理完毕的消息转发到其他消费者进行处理,这种思想有如我们常见的主备设置策略。
RabbitMQ提供消息确认机制来确保每一个消息都不会丢失,其原理是当RabbitMQ接收到一个从消费者发出的表明任务已处理完毕的确认包(ack)后,才其从队列中释放删除。
如果某一个消费者突然崩溃(如通道关闭、连接关闭或TCP连接丢失)而没有发出确认包,RabbitMQ将会认为该消息并没有被完全处理,因此会重新将其加入到队列中。如果在此时还有其他消费者在线,那么当前消息也会很快被分发处理掉,这样即使在某些消费者意外掉线关闭的情况下,我们也能确保所有消息会被丢失。
消息确认没有超时机制,RabbitMQ只会在消费者Down掉之后才进行重新分发,因此即使对于某些耗时很长的任务也不会有影响。
在这个Demo里面,我们将Consume()函数的aotu-ack参数设为false,然后当任务处理完毕之后通过d.Ack(false)手动发送一个确认消息
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
这样,即使我们通过Ctrl+C来关闭某一个正在处理消息的消费者,其消息也不会丢失,RabbitMQ马上就会将当前未确认的消息转发的其他消费者处理。
需要注意的是消息确认包的目的地必须是当前消息的接收通道,如果将确认包发送到其他通道时会引发异常。更多的信息科参考doc guide on confirmations.
Forgotten acknowledgment
忘记对消息进行确认是一个比较常见的错误,这个错误很容易犯,但是后果很严重。
当消费者退出后消息会重发,却永远没有确认删除的包,因此RabbitMQ消息越积越多就会吃掉越来越多的内存,最后可能导致崩溃。
对于这种未确认的消息调试,我们可以使用rabbitmqcrl命令来打印message_unacknowledged的内容:
sudo rabbitmqctl list_queue name message_ready message_unacknowledged
在Windows下,
rabbitmqctl.bat list_queue name message_ready message_unacknowledged
消息持久化
前面我们讲解了当消费者程序Down掉如何保证消息不丢失。可如果是RabbitMQ崩溃呢?消息还能保证不丢失吗?
当RabbitMQ退出或崩溃时,除非你明确地指定,否则所有的队列和消息都会丢失。要做到消息不丢失需满足两个条件:队列和消息的持久化。
首先,要保证队列不会丢失,可将队列声明为持久化:
q, err := ch.QueueDeclare(
"hello", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
上述代码看起来没有问题,但到目前为止,如果直接就这样运行,那么队列还是无法持久化而导致丢失。这是因为我们之前已经定义了一个名为“Hello”的队列,RabbitMQ不允许创建多个名称相同而参数不同的队列,这个跟函数重载有区别,但这种情况发生时,RabbitMQ会返回错误。既然如此,直接换个名字:task_queue,
q, err := ch.QueueDeclare(
"task_queue", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
注意:durable参数在生产者和消费者程序中都要指定为True。
现在,task_queue队列即使在RabbitMQ重启之后也不会丢失了。接着就需要实现对消息的持久化,这个也很简单,只需要在amqp.Publishing函数中设置一下amqp.Persistent参数即可:
err = ch.Publishing(
"", //exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
关于消息持久化
将消息设置为Persistent并不能百分百地完全保证消息不会丢失。虽然RabbitMQ知道要将消息写到磁盘,但在RabbitMQ接收到消息和写入磁盘前还是有个时间空档。
因为RabbitMQ并不会对每一个消息都执行fsync(2),因此消息可能只是写入缓存而不是磁盘。
所以Persistent选项并不是完全强一致性的,但在应付我们的简单场景已经足够。如需对消息完全持久化,可参考publisher confirms.
公平分发
有时候队列的轮询调度并不能满足我们的需求,假设有这么一个场景,存在两个消费者程序,所有的单数序列消息都是长耗时任务而双数序列消息则都是简单任务,那么结果将是一个消费者一直处于繁忙状态而另外一个则几乎没有任务被挂起。当RabbitMQ对此情况却是视而不见,仍然根据轮询来分发消息。
导致这种情况发生的根本原因是RabbitMQ是根据消息的入队顺序进行派发,而并不关心在线消费者还有多少未确认的消息,它只是简单的将第N条消息分发到第N个消费者:
为了避免这种情况,我们可以给队列设置预取数(prefect count)为1。它告诉RabbitMQ不要一次性分发超过1个的消息给某一个消费者,换句话说,就是当分发给该消费者的前一个消息还没有收到ack确认时,RabbitMQ将不会再给它派发消息,而是寻找下一个空闲的消费者目标进行分发。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set Qos")
关于队列长度
NOTE:如果所有的消费者都繁忙,队列可能会被消息填满。你需要注意这种情况,要么通过增加消费者来处理,要么改用其他的策略。
整合上面的代码
我们将上面的片段整合起来,那么new_task.go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) <2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
Github地址 new_task.go.
worker.go文件如下:
package main
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
Github地址:worker.go.
最后,为了验证上面轮询调度、消息持久化和公平分发的特性,你可以多开几个Shell窗口,发几条长耗时的消息,然后停掉某一些worker或重启RabbitMQ就能观察到与之相符的现象。
总结
本篇介绍了通过消息确认机制和设置预取消息长度的方式来实现一个工作队列,而持久化选项的设置可以保证队列和消息在出现消费者崩溃或RabbitMQ重启的异常情况下都不会丢失。