热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

GoRabbitMQ工作队列(二)

rabbitMQ工作队列在之前内容中我们通过一个队列实现了消息的发送跟接收。接下来我们创建工作队列(WorkQueue),用于在多个工作者之间分配耗时的任务工作队列(任务队列)背后

rabbitMQ工作队列

在之前内容中我们通过一个队列实现了消息的发送跟接收。接下来我们创建工作队列(Work Queue),用于在多个工作者之间分配耗时的任务

工作队列(任务队列)背后的核心主要是避免立即执行资源密集型的任务,必须等待其工作完成。我们将任务封装为消息后将其发送到队列,后台的工作进程将弹出任务并最终执行,当我们运行很多Worker时候,任务将在它们之间共享


round-robin 调度



  • 使用任务队列的优点之一就是能够轻松的并行化工作

  • 默认情况下,RabbitMQ会将每一条信息按照消费者顺序发送给一个消费者,这样平均每个消费者会接收到相同数量的消息,这种消息分发的模式叫做round-robin(启动多个接收端,然后发送多个消息试试)


message acknowledgment(消息确认)

为了确保消息不会丢失,RabbitMQ支持消息确认,消费者消费了一个消息之后会发送一个ack给RabbitMQ,这样RabbitMQ就可以删除掉这个消息

如果一个消费者异常(通道关闭或链接关闭或TCP链接丢失)没有发送ACK给rabbitMQ,rabbitMQ会将该消息重新放入队列当中。此时如果有其他消费者在线,rabbitMQ会重新将该消息再次投递到另一个消费者



  • 手动确认ACK

    • 手动确认ACK我们可以在创建消费者的时候将auto-ack设置为false,一旦我们消费消息任务完毕的时候使用d.Ack(false)来确认ack,告诉RabbitMQ该消息可以删除

    msgs,err := ch.Consume(
    q.Name,
    "",
    false,//将autoAck设置为false,则需要在消费者每次消费完成
    // 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费
    false,
    false,
    false,
    nil,
    )
    FailError(err,"Failed to register a consumer")
    forever := make(chan bool)
    go func() {
    for d := range msgs{
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    //multiple为true的时候:此次交付和之前没有确认的交付都会在通过同一个通道交付,这在批量处理的时候很有用
    //为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除
    d.Ack(false)
    }
    }()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

    使用以上设置后,我们可以保证即使worker在执行任务的时候意外退出也不会丢失消息。在worker意外退出的不久之后消息将会被重新投递。确认ack必须使用接收到消息的通道,如果使用不同的通道将会导致一个通道协议异常



  • 忘记确认ack



    • 在开发的时候经常会忘记对消费过的消息进行ack确认,这是一个很严重的错误,可以使用以下命令查看RabbitMQ中有多少消息在准备中或是未确认的: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

    Listing queues for vhost / ...
    name messages_ready messages_unacknowledged
    hello 0 1


    • messages_ready:未投递的消息

    • messages_unacknowledged:投递未收到回复的消息




消息持久

我们已经知道如何确保即使消费者意外退出的情况下保证任务不会丢失。但是如果RabbitMQ服务停止的话任务还是会丢失。当RabbitMQ退出或异常的时候,它将会丢失队列和消息,除非你设置RabbitMQ的两个地方:将队列和消息进行标记为持久的



  1. 首先设置队列durable为true

    q, err := ch.QueueDeclare(
    "hello", // name
    true, // durable
    false, // delete when unused
    false, // exclusive
    false, // no-wait
    nil, // arguments
    )

    RabbitMQ不允许使用不同参数重新定义一个已经存在的队列,所以队列已经存在的话修改了上面的配置后运行程序是不会改变已经存在的队列的



  2. 然后设置消息为持久化存储:

    err = ch.Publish(
    "", // exchange
    q.Name, // routing key
    false, // mandatory
    false,
    amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType: "text/plain",
    Body: []byte(body),
    })

    注意:设置消息持久化并不能保证消息不会丢失,因为仍然有一小段时间片处于RabbitMQ收到消息但是还没保存,它可能只是保存在内存当中。但是已经满足我们的基本使用,如果你需要强保证的话可以使用publisher confirms




公平调度(Fair dispatch)



  • RabbitMQ的默认消息分配不能够满足我们的需要,比如有两个消费者,其中一个消费者经常忙碌的状态,另外一个消费者几乎不做任何工作,但是RabbitMQ仍然均匀的在两者之间调度消息。这是因为RabbitMQ只做队列当中的消息调度而没有查看某个消费者中未确认的消息,它只是盲目的将第n条消息发送给第n个消费者

  • 解决以上问题我们可以设置prefetch count数值为1,这样只有当消费者消费完消息并返回ack确认后RabbitMQ才会给其分发消息,否则只会将消息分发给其他空闲状态的消费者

err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)

注意:消费者必须要设置,生产者不用设置


完整代码



  • new_task.go

func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failError(err,"send:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
failError(err,"Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,// 设置为true之后RabbitMQ将永远不会丢失队列,否则重启或异常退出的时候会丢失
false,
false,
false,
nil,
)
failError(err,"Failed to declare a queue")
fmt.Println(q.Name)
body := bodyFrom(os.Args)
//生产者将消息发送到默认交换器中,不是发送到队列中
ch.Publish(
"",//默认交换器
q.Name,//使用队列的名字来当作route-key是因为声明的每一个队列都有一个隐式路由到默认交换器
false,
false,
amqp.Publishing{
DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
Body:[]byte(body),
})
failError(err,"Failed to publish a message")
log.Printf(" [x] Sent %s",body)
}
func bodyFrom(args []string)string {
var s string
if len(args) <2 || os.Args[1] == "" {
s = "hello"
}else {
s = strings.Join(args[1:]," ")
}
return s
}
func failError(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}


  • Worker.go

func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailError1(err,"receive:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
FailError1(err,"receive:Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
err = ch.Qos(
1, //// 在没有返回ack之前,最多只接收1个消息
0,
false,
)
FailError1(err,"Failed to set Qos")
msgs,err := ch.Consume(
q.Name,
"",
false,//将autoAck设置为false,则需要在消费者每次消费完成
// 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费
false,
false,
false,
nil,
)
FailError1(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
fmt.Println()
time.Sleep(t * time.Second)
log.Printf("Done")
//multiple为true的时候:此次交付和之前没有确认的交付都会在通过同一个通道交付,这在批量处理的时候很有用
//为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func FailError1(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}


推荐阅读
  • Git基础操作指南:掌握必备技能
    掌握 Git 基础操作是每个开发者必备的技能。本文详细介绍了 Git 的基本命令和使用方法,包括初始化仓库、配置用户信息、添加文件、提交更改以及查看版本历史等关键步骤。通过这些操作,读者可以快速上手并高效管理代码版本。例如,使用 `git config --global user.name` 和 `git config --global user.email` 来设置全局用户名和邮箱,确保每次提交时都能正确标识提交者信息。 ... [详细]
  • 为了在Fragment中直接调用Activity的方法,可以通过定义一个接口并让Activity实现该接口来实现。具体步骤包括:首先在Fragment中声明一个接口,并在Activity中实现该接口。接着,在Fragment中通过类型转换检查Activity是否实现了该接口,如果实现了则调用相应的方法。这种方法不仅提高了代码的解耦性,还增强了模块间的通信效率。此外,还可以通过ViewModel或LiveData等现代Android架构组件进一步优化这一过程,以实现更加高效和可靠的通信机制。 ... [详细]
  • Prim算法在处理稠密图时表现出色,尤其适用于边数远多于顶点数的情形。传统实现的时间复杂度为 \(O(n^2)\),但通过引入优先队列进行优化,可以在点数为 \(m\)、边数为 \(n\) 的情况下显著降低时间复杂度,提高算法效率。这种优化方法不仅能够加速最小生成树的构建过程,还能在大规模数据集上保持良好的性能表现。 ... [详细]
  • 本文深入探讨了 iOS 开发中 `int`、`NSInteger`、`NSUInteger` 和 `NSNumber` 的应用与区别。首先,我们将详细介绍 `NSNumber` 类型,该类用于封装基本数据类型,如整数、浮点数等,使其能够在 Objective-C 的集合类中使用。通过分析这些类型的特性和应用场景,帮助开发者更好地理解和选择合适的数据类型,提高代码的健壮性和可维护性。苹果官方文档提供了更多详细信息,可供进一步参考。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • 本文深入解析了 Apache 配置文件 `httpd.conf` 和 `.htaccess` 的优化方法,探讨了如何通过合理配置提升服务器性能和安全性。文章详细介绍了这两个文件的关键参数及其作用,并提供了实际应用中的最佳实践,帮助读者更好地理解和运用 Apache 配置。 ... [详细]
  • 前端技术实现调用摄像头进行拍照功能
    在公司项目中,为了实现调用摄像头进行拍照的功能,我们深入研究了HTML5的相关技术。尽管Java在许多方面表现出色,但在这一场景下,HTML5的灵活性和易用性更胜一筹。本文将分享具体的代码设计和实现细节,帮助开发者快速掌握这一功能。 ... [详细]
  • 大数据应用实例:电视收视率分析企业项目实操第二篇
    本文继续探讨大数据在电视收视率分析中的应用,详细介绍了如何在CentOS系统中进行防火墙管理。针对CentOS 6.5及更早版本,提供了具体的命令操作步骤,包括停止防火墙服务和禁用防火墙启动。此外,还深入讨论了这些操作对数据传输和系统安全的影响,为实际项目实施提供了宝贵的技术参考。 ... [详细]
  • 如何在IDEA中安装和配置反编译插件以提高代码审查效率
    在 IntelliJ IDEA 中提升代码审查效率的一种方法是安装和配置反编译插件。首先,进入 IDEA 的设置界面,然后导航到插件管理部分。接下来,搜索 "ideaJad" 插件并进行安装。安装完成后,重启 IDEA 以确保插件生效。这将帮助你在审查二进制文件时更加高效地查看源代码。 ... [详细]
  • 优化后的标题:数据网格视图(DataGridView)在应用程序中的高效应用与优化策略
    在应用程序中,数据网格视图(DataGridView)的高效应用与优化策略至关重要。本文探讨了多种优化方法,包括但不限于:1)通过合理的数据绑定提升性能;2)利用虚拟模式处理大量数据,减少内存占用;3)在格式化单元格内容时,推荐使用CellParsing事件,以确保数据的准确性和一致性。此外,还介绍了如何通过自定义列类型和优化渲染过程,进一步提升用户体验和系统响应速度。 ... [详细]
  • 本文深入探讨了原型模式在软件设计中的应用与实现。原型模式通过使用已有的实例作为原型来创建新对象,而不是直接通过类实例化。这种方式不仅简化了对象的创建过程,还提高了系统的灵活性和效率。具体来说,原型模式涉及一个支持克隆功能的接口或基类,子类通过实现该接口来提供具体的克隆方法,从而实现对象的快速复制。此外,文章还详细分析了原型模式的优缺点及其在实际项目中的应用场景,为开发者提供了实用的指导和建议。 ... [详细]
  • 在执行 Vim/VM 命令时遇到错误提示:检测到名为
    在使用 Docker 时,通过 Vim 编辑 Dockerfile 文件时遇到了错误提示:“检测到名为 .dockerfile.swp 的交换文件”。这一问题通常是因为上次编辑该文件时意外中断,导致系统生成了临时的交换文件。为了解决这个问题,可以手动删除该交换文件或使用 Vim 的恢复功能来恢复未保存的更改。 ... [详细]
  • Python学习:环境配置与安装指南
    Python作为一种跨平台的编程语言,适用于Windows、Linux和macOS等多种操作系统。为了确保本地已成功安装Python,用户可以通过终端或命令行界面输入`python`或`python3`命令进行验证。此外,建议使用虚拟环境管理工具如`venv`或`conda`,以便更好地隔离不同项目依赖,提高开发效率。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
author-avatar
rvr4845591
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有