热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

golangRabbitMQ教程一WorkQueues

在上一篇介绍中我们实现了从消息产生程序发送简单消息到命名队列的过程,本篇将实现一个将耗时任务分发到多个消费者程序的工作队列。工作队列的主要思想是避免对资源密集型任务处理时的等待,

在上一篇介绍中我们实现了从消息产生程序发送简单消息到命名队列的过程,本篇将实现一个将耗时任务分发到多个消费者程序的工作队列。

工作队列的主要思想是避免对资源密集型任务处理时的等待,而是先将任务压入队列,后期再进行计划处理。我们将任务封装成消息发送给队列,由队列程序按策略分发到所有的在线工作者程序执行。当有多个工作程序同时在线时,多项任务同时被多个不同的工作者处理便成为可能。

在Web应用程序领域,如果需要在一个HTTP短连接中完成一些复杂的耗时任务时,工作队列的思想能大幅提高处理效率而带来了更好的用户体验。

准备


在上一篇介绍中我们发送一个固定的字符串“Hello world"到队列,然后在接收程序中打印出来。这里没有实现诸如图片大小的调整、PDF文件的渲染等真实的复杂任务,而是用特定字符串来表示复杂任务,导致消息处理程序忙碌。任务处理程序通过time.Sleep函数让线程睡眠来模拟复杂度,以一连串的字符"."来表示任务的复杂度,每一个点表示停顿1秒钟,如"Hello..."表示任务耗时3秒钟。

还是在之前例子的send.go文件上进行修改,让程序通过命令行将任意个消息参数传递到队列,姑且将新文件命名为new_task.go:

body := bodyFrom(os.Args)
err = ch.Publish(
    "",     //exchange
    q.Name, //routing key
    false,  //mandatory
    false,
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:    "text/plain",
        Body:           []byte(body),
    }
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

同样的,receive.go文件也需要进行修改,根据消息体中"."的个数来模拟任务的耗时长度。该文件的任务还是从队列中取出一个任务并执行,我们姑且称之为work.go:

msgs, err := ch.Consume(
    q.Name,     //queue
    "",         //consumer
    true,       //auto-ack
    false,      //exclusive
    false,      //no-local
    false,      //no-wait
    nil,        //args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func(){
    for d:= range msgs{
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t* time.Second)
        log.Printf("Done")
    }
}()

log.Printf(" [*] Waiting for message. To exit press CTRL+C")
<-forever

好了,到此为止,我们先来看看上述的模拟是否成功:

先运行worker.go

#shell 1
go run worker.go

然后另起一个终端运行new_task.go

#shell 2
go run new_task.go ray..

上述命令执行完后,继续测试:

#shell 2
go run new_task.go ray....

第一次new_task.go ray..有两个点,Done的输出间隔是2秒,ray.....,输出间隔是5秒。那么恭喜,到目前为止,我们一切顺利。

轮询调度(Round-robin dispatching)


工作队列的优势是能轻松处理多个积压的任务,如果有一个已经堆满的任务队列待处理,只需添加多个消费者,这些消费者便都能对队列进行消耗。

首先,想象一下如果同时运行两个worker.go脚本,当生产者不断发送消息到队列时,会出现什么情况?

我们需要三个终端来运行这个小例子,两个运行worker.go,我们将他们看成两个消费者C1和C2.

#shell 1
go run worker.go


#shell 2
go run worker.go

然后在第三个终端中,发送消息到队列,你可以尝试多次,如下:

#shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
...

因为实在另外机器上的缘故,本帖不贴该结果。

默认地,RabbitMQ会按收到的消息顺序依次发送到每一个消费者中,从总体上来看,每个消费者会收到同样多的消息。这种消息分发方式叫做round-robin(轮询调度).

消息确认


当处理一个长耗时任务时,任务处理程序(消费者)可能由于某种原因以外崩溃,那么此时会发生什么事情呢?在我们目前的代码中,一旦RabbitMQ将消息发送到消费者时就会将其标记并删除,而不会去关心消费者程序是否执行完毕。因此在这种情形下,如果你关闭了一个正在处理某项任务的消费者时,会导致其正在处理的及已分发给它却还没来得及处理的任务丢失。

然而在很多真实情况下,我们并不希望丢失掉任何一条消息,如订单信息、支付信息等。当某一消费者突然崩溃后,我们希望将其未处理完毕的消息转发到其他消费者进行处理,这种思想有如我们常见的主备设置策略。

RabbitMQ提供消息确认机制来确保每一个消息都不会丢失,其原理是当RabbitMQ接收到一个从消费者发出的表明任务已处理完毕的确认包(ack)后,才其从队列中释放删除。

如果某一个消费者突然崩溃(如通道关闭、连接关闭或TCP连接丢失)而没有发出确认包,RabbitMQ将会认为该消息并没有被完全处理,因此会重新将其加入到队列中。如果在此时还有其他消费者在线,那么当前消息也会很快被分发处理掉,这样即使在某些消费者意外掉线关闭的情况下,我们也能确保所有消息会被丢失。

消息确认没有超时机制,RabbitMQ只会在消费者Down掉之后才进行重新分发,因此即使对于某些耗时很长的任务也不会有影响。

在这个Demo里面,我们将Consume()函数的aotu-ack参数设为false,然后当任务处理完毕之后通过d.Ack(false)手动发送一个确认消息

msgs, err := ch.Consume(
    q.Name, // queue
    "", // consumer
    false, // auto-ack
    false, // exclusive
    false, // no-local
    false, // no-wait
    nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
    for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
    }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

这样,即使我们通过Ctrl+C来关闭某一个正在处理消息的消费者,其消息也不会丢失,RabbitMQ马上就会将当前未确认的消息转发的其他消费者处理。

需要注意的是消息确认包的目的地必须是当前消息的接收通道,如果将确认包发送到其他通道时会引发异常。更多的信息科参考doc guide on confirmations.

Forgotten acknowledgment

忘记对消息进行确认是一个比较常见的错误,这个错误很容易犯,但是后果很严重。

当消费者退出后消息会重发,却永远没有确认删除的包,因此RabbitMQ消息越积越多就会吃掉越来越多的内存,最后可能导致崩溃。

对于这种未确认的消息调试,我们可以使用rabbitmqcrl命令来打印message_unacknowledged的内容:

sudo rabbitmqctl list_queue name message_ready message_unacknowledged

在Windows下,

rabbitmqctl.bat list_queue name message_ready message_unacknowledged

消息持久化


前面我们讲解了当消费者程序Down掉如何保证消息不丢失。可如果是RabbitMQ崩溃呢?消息还能保证不丢失吗?

当RabbitMQ退出或崩溃时,除非你明确地指定,否则所有的队列和消息都会丢失。要做到消息不丢失需满足两个条件:队列和消息的持久化。

首先,要保证队列不会丢失,可将队列声明为持久化:

q, err := ch.QueueDeclare(
    "hello",        //name
    true,           //durable
    false,          //delete when unused
    false,          //exclusive
    false,          //no-wait
    nil,            //arguments
)
failOnError(err, "Failed to declare a queue")

上述代码看起来没有问题,但到目前为止,如果直接就这样运行,那么队列还是无法持久化而导致丢失。这是因为我们之前已经定义了一个名为“Hello”的队列,RabbitMQ不允许创建多个名称相同而参数不同的队列,这个跟函数重载有区别,但这种情况发生时,RabbitMQ会返回错误。既然如此,直接换个名字:task_queue,

q, err := ch.QueueDeclare(
    "task_queue",        //name
    true,           //durable
    false,          //delete when unused
    false,          //exclusive
    false,          //no-wait
    nil,            //arguments
)
failOnError(err, "Failed to declare a queue")

注意:durable参数在生产者和消费者程序中都要指定为True。

现在,task_queue队列即使在RabbitMQ重启之后也不会丢失了。接着就需要实现对消息的持久化,这个也很简单,只需要在amqp.Publishing函数中设置一下amqp.Persistent参数即可:

err = ch.Publishing(
    "",         //exchange
    q.Name,     // routing key
    false,      // mandatory
    false,
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:    "text/plain",
        Body:           []byte(body),
    }
)

关于消息持久化

将消息设置为Persistent并不能百分百地完全保证消息不会丢失。虽然RabbitMQ知道要将消息写到磁盘,但在RabbitMQ接收到消息和写入磁盘前还是有个时间空档。

因为RabbitMQ并不会对每一个消息都执行fsync(2),因此消息可能只是写入缓存而不是磁盘。

所以Persistent选项并不是完全强一致性的,但在应付我们的简单场景已经足够。如需对消息完全持久化,可参考publisher confirms.

公平分发


有时候队列的轮询调度并不能满足我们的需求,假设有这么一个场景,存在两个消费者程序,所有的单数序列消息都是长耗时任务而双数序列消息则都是简单任务,那么结果将是一个消费者一直处于繁忙状态而另外一个则几乎没有任务被挂起。当RabbitMQ对此情况却是视而不见,仍然根据轮询来分发消息。

导致这种情况发生的根本原因是RabbitMQ是根据消息的入队顺序进行派发,而并不关心在线消费者还有多少未确认的消息,它只是简单的将第N条消息分发到第N个消费者:

为了避免这种情况,我们可以给队列设置预取数(prefect count)为1。它告诉RabbitMQ不要一次性分发超过1个的消息给某一个消费者,换句话说,就是当分发给该消费者的前一个消息还没有收到ack确认时,RabbitMQ将不会再给它派发消息,而是寻找下一个空闲的消费者目标进行分发。

err = ch.Qos(
    1,      // prefetch count
    0,      // prefetch size
    false,  // global
)
failOnError(err, "Failed to set Qos")

关于队列长度

NOTE:如果所有的消费者都繁忙,队列可能会被消息填满。你需要注意这种情况,要么通过增加消费者来处理,要么改用其他的策略。

整合上面的代码

我们将上面的片段整合起来,那么new_task.go:

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "",           // exchange
                q.Name,       // routing key
                false,        // mandatory
                false,
                amqp.Publishing{
                        DeliveryMode: amqp.Persistent,
                        ContentType:  "text/plain",
                        Body:         []byte(body),
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) <2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

Github地址 new_task.go.

worker.go文件如下:

package main

import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                        dot_count := bytes.Count(d.Body, []byte("."))
                        t := time.Duration(dot_count)
                        time.Sleep(t * time.Second)
                        log.Printf("Done")
                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

Github地址:worker.go.

最后,为了验证上面轮询调度、消息持久化和公平分发的特性,你可以多开几个Shell窗口,发几条长耗时的消息,然后停掉某一些worker或重启RabbitMQ就能观察到与之相符的现象。

总结

本篇介绍了通过消息确认机制和设置预取消息长度的方式来实现一个工作队列,而持久化选项的设置可以保证队列和消息在出现消费者崩溃或RabbitMQ重启的异常情况下都不会丢失。


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 本书详细介绍了在最新Linux 4.0内核环境下进行Java与Linux设备驱动开发的全面指南。内容涵盖设备驱动的基本概念、开发环境的搭建、操作系统对设备驱动的影响以及具体开发步骤和技巧。通过丰富的实例和深入的技术解析,帮助读者掌握设备驱动开发的核心技术和最佳实践。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • 可查看rabbitmq官方集群方案架构图Thisguidecoversmirroring(queuecontentsreplication)ofclassicqueues--摘自 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 进程(Process)是指计算机中程序对特定数据集的一次运行活动,是系统资源分配与调度的核心单元,构成了操作系统架构的基础。在早期以进程为中心的计算机体系结构中,进程被视为程序的执行实例,其状态和控制信息通过任务描述符(task_struct)进行管理和维护。本文将深入探讨进程的概念及其关键数据结构task_struct,解析其在操作系统中的作用和实现机制。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • Java队列机制深度解析与应用指南
    Java队列机制在并发编程中扮演着重要角色。本文深入解析了Java队列的各种实现类及其应用场景,包括`LinkedList`、`ArrayBlockingQueue`和`PriorityQueue`等,并探讨了它们在高并发环境下的性能表现和适用场景。通过详细分析这些队列的内部机制和使用技巧,帮助开发者更好地理解和应用Java队列,提升系统的设计和架构能力。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 题目旨在解决树上的路径最优化问题,具体为在给定的树中寻找一条长度介于L到R之间的路径,使该路径上的边权平均值最大化。通过点分治策略,可以有效地处理此类问题。若无长度限制,可采用01分数规划模型,将所有边权减去一个常数m,从而简化计算过程。此外,利用单调队列优化动态规划过程,进一步提高算法效率。 ... [详细]
  • 在Adobe After Effects中,通过高效添加地图指北针,可以显著提升地理信息的准确性和视觉效果。本文介绍了一种方法,利用代码函数 `public static void ExportMapEx(string filepath, int resolution, AxMapControl curMapControl)`,实现地图导出时自动添加指北针,确保地理数据的精确性和一致性。此外,还详细探讨了如何优化指北针的位置和样式,以增强地图的可读性和专业性。 ... [详细]
  • 软件开发史上最具影响力的十位编程大师(附图解)
    在软件开发领域,有十位编程大师对行业发展产生了深远影响。本文基于国外知名社区的一项评选,通过图文并茂的形式,详细介绍了这十位杰出人物,包括游戏开发先驱John Carmack等,为读者呈现了他们卓越的技术贡献与创新精神。 ... [详细]
  • Spring cloud微服务架构前后端分离博客系统,Vue+boot源码分享 ... [详细]
author-avatar
帕皮丝汀阿奎莱拉
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有