热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ消息顺序、消息幂等、消息重复、消息事务、集群

1.消息顺序场景:比如下单操作,下单成功之后,会发布创建订单和扣减库存消息,但扣减库存消息执行会先于创建订单消息,也就说前者执行成功之后,才能执行后者。不保证完全按照顺序消费,在M

1. 消息顺序

场景:比如下单操作,下单成功之后,会发布创建订单和扣减库存消息,但扣减库存消息执行会先于创建订单消息,也就说前者执行成功之后,才能执行后者。

不保证完全按照顺序消费,在 MQ 层面支持消息的顺序处理开销太大,为了极少量的需求,增加整体上的复杂度得不偿失。

所以,还是在应用层面处理比较好,或者业务逻辑进行处理。

应用层解决方式:



  • 1. 消息实体中增加:版本号 & 状态机 & msgid & parent_msgid,通过 parent_msgid 判断消息的顺序(需要全局存储,记录消息的执行状态)。 2. “同步执行”:当一个消息执行完之后,再发布下一个消息。


2. 消息幂等、消息重复、消息事务


消息重复

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

消费端处理消息的业务逻辑保持幂等性。

保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

第 1 条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第 2 条原理就是利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

第 1 条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第 2 条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RabbitMQ 不解决消息重复的问题的原因。

RabbitMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。


AMQP 消费者确认机制

AMQP 定义了消费者确认机制(message ack),如果一个消费者应用崩溃掉(此时连接会断掉,broker 会得知),但是 broker 尚未获得 ack,那么消息会被重新放入队列。所以 AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。


AMQP 生产者事务

对于生产者,AMQP 定义了事务(tx transaction)来确保生产消息被 broker 接收并成功入队。TX 事务是阻塞调用,生产者需等待broker写磁盘后返回的确认,之后才能继续发送消息。事务提交失败时(如broker宕机场景),broker并不保证提交的消息全部入队。

TX 的阻塞调用使 broker 的性能非常差,RabbitMQ 使用 confirm 机制来优化生产消息的确认。Confirm 模式下,生产者可以持续发送消息,broker 将消息批量写磁盘后回复确认,生产者通过确认消息的ID来确定哪些已发送消息被成功接收。Confirm 模式下生产者发送消息和接受确认是异步流程,生产者需要缓存未确认的消息以便出错时重新发送。


总结



  • 1. 消息重复发布:不存在,因为 AMQP 定义了事务(tx transaction)来确保生产消息被 broker 接收并成功入队。TX 事务是阻塞调用,生产者需等待 broker 写磁盘后返回的确认,之后才能继续发送消息。事务提交失败时(如 broker 宕机场景),broker 并不保证提交的消息全部入队。RabbitMQ 使用 confirm 机制来优化生产消息的确认(可以持续发布消息,但会批量回复确认)。 2. 消息重复消费:AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。

应用层解决方式:



  • 1. 专门的 Map 存储:用来存储每个消息的执行状态(用 msgid 区分),执行成功之后更新 Map,有另外消息重复消费的时候,读取 Map 数据判断 msgid 对应的执行状态,已消费则不执行。 2. 业务逻辑判断:消息执行完会更改某个实体状态,判断实体状态是否更新,如果更新,则不进行重复消费。

特别说明:AMQP 协议中的事务仅仅是指生产者发送消息给 broker 这一系列流程处理的事务机制,并不包含消费端的处理流程。


3. 集群

原 RabbitMQ 集群:manager1、manager2、manager3 节点均为磁盘存储,manager1 为主节点,HAProxy 负载三个节点。

现 RabbitMQ 集群更新(更合理的配置):



  • 1. RabbitMQ 集群更新:manager1、manager2 节点类型改为 ram(内存存储),manager3 节点类型为 disc(磁盘存储,用于保存集群配置和元数据),主节点变更为 manager3。 2. HAProxy 负载更新:移除 manager3 负载(5672 端口),只保留 manage2、manager2 负载。


4. Kafka 和 RabbitMQ 对比

Kafka 的设计有明确的介绍:。

Kafka 应对场景:消息持久化、吞吐量是第一要求、状态由客户端维护、必须是分布式的。Kafka 认为 broker 不应该阻塞生产者,高效的磁盘顺序读写能够和网络 IO 一样快,同时依赖现代 OS 文件系统特性,写入持久化文件时并不调用 flush,仅写入 OS pagecache,后续由 OS flush。

这些特性决定了 Kafka 没有做“确认机制”,而是直接将生产消息顺序写入文件、消息消费后不删除(避免文件更新),该实现充分利用了磁盘 IO,能够达到较高的吞吐量。代价是消费者要依赖 Zookeeper 记录队列消费位置、处理同步问题。没有消费确认机制,还导致了 Kafka 无法了解消费者速度,不能采用 push 模型以合理的速度向消费者推送数据,只能利用 pull 模型由消费者来拉消息(消费者承担额外的轮询开销)。

如果在 Kafka 中引入消费者确认机制,就需要 broker 维护消息消费状态,要做到高可靠就需要写文件持久化并与生产消息同步,这将急剧降低 Kafka 的性能,这种设计也极类似 RabbitMQ。如果不改变 Kafka 的实现,而是在 Kafka 和消费者之间做一层封装,还是需要实现一套类似 RabbitMQ 的消费确认和持久化机制。

参考资料:

1. 消息顺序 场景:比如下单操作,下单成功之后,会发布创建订单和扣减库存消息,但扣减库存消息执行会先于创建订单消息,也就说前者执行成功之后,才能执行后者。 不保证完全按照顺序消费,在 MQ 层面支持消息的顺序处理开销太大,为了极少量的需求,增加整体上的复杂度得不偿失。 所以,还是在应用层面处理比较好,或者业务逻辑进行处理。 应用层解决方式: 1. 消息实体中增加:版本号 & 状态机 & msgid & parent_msgid,通过 parent_msgid 判断消息的顺序(需要全局存储,记录消息的执行状态)。 2. “同步执行”:当一个消息执行完之后,再发布下一个消息。 2. 消息幂等、消息重复、消息事务 消息重复 造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理? 消费端处理消息的业务逻辑保持幂等性。 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。 第 1 条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第 2 条原理就是利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。 第 1 条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第 2 条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RabbitMQ 不解决消息重复的问题的原因。 RabbitMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。 AMQP 消费者确认机制 AMQP 定义了消费者确认机制(message ack),如果一个消费者应用崩溃掉(此时连接会断掉,broker 会得知),但是 broker 尚未获得 ack,那么消息会被重新放入队列。所以 AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。 AMQP 生产者事务 对于生产者,AMQP 定义了事务(tx transaction)来确保生产消息被 broker 接收并成功入队。TX 事务是阻塞调用,生产者需等待broker写磁盘后返回的确认,之后才能继续发送消息。事务提交失败时(如broker宕机场景),broker并不保证提交的消息全部入队。 TX 的阻塞调用使 broker 的性能非常差,RabbitMQ 使用 confirm 机制来优化生产消息的确认。Confirm 模式下,生产者可以持续发送消息,broker 将消息批量写磁盘后回复确认,生产者通过确认消息的ID来确定哪些已发送消息被成功接收。Confirm 模式下生产者发送消息和接受确认是异步流程,生产者需要缓存未确认的消息以便出错时重新发送。 总结 1. 消息重复发布:不存在,因为 AMQP 定义了事务(tx transaction)来确保生产消息被 broker 接收并成功入队。TX 事务是阻塞调用,生产者需等待 broker 写磁盘后返回的确认,之后才能继续发送消息。事务提交失败时(如 broker 宕机场景),broker 并不保证提交的消息全部入队。RabbitMQ 使用 confirm 机制来优化生产消息的确认(可以持续发布消息,但会批量回复确认)。 2. 消息重复消费:AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。 应用层解决方式: 1. 专门的 Map 存储:用来存储每个消息的执行状态(用 msgid 区分),执行成功之后更新 Map,有另外消息重复消费的时候,读取 Map 数据判断 msgid 对应的执行状态,已消费则不执行。 2. 业务逻辑判断:消息执行完会更改某个实体状态,判断实体状态是否更新,如果更新,则不进行重复消费。 特别说明:AMQP 协议中的事务仅仅是指生产者发送消息给 broker 这一系列流程处理的事务机制,并不包含消费端的处理流程。 3. 集群 原 RabbitMQ 集群:manager1、manager2、manager3 节点均为磁盘存储,manager1 为主节点,HAProxy 负载三个节点。 现 RabbitMQ 集群更新(更合理的配置): 1. RabbitMQ 集群更新:manager1、manager2 节点类型改为 ram(内存存储),manager3 节点类型为 disc(磁盘存储,用于保存集群配置和元数据),主节点变更为 manager3。 2. HAProxy 负载更新:移除 manager3 负载(5672 端口),只保留 manage2、manager2 负载。 4. Kafka 和 RabbitMQ 对比 Kafka 的设计有明确的介绍:。 Kafka 应对场景:消息持久化、吞吐量是第一要求、状态由客户端维护、必须是分布式的。Kafka 认为 broker 不应该阻塞生产者,高效的磁盘顺序读写能够和网络 IO 一样快,同时依赖现代 OS 文件系统特性,写入持久化文件时并不调用 flush,仅写入 OS pagecache,后续由 OS flush。 这些特性决定了 Kafka 没有做“确认机制”,而是直接将生产消息顺序写入文件、消息消费后不删除(避免文件更新),该实现充分利用了磁盘 IO,能够达到较高的吞吐量。代价是消费者要依赖 Zookeeper 记录队列消费位置、处理同步问题。没有消费确认机制,还导致了 Kafka 无法了解消费者速度,不能采用 push 模型以合理的速度向消费者推送数据,只能利用 pull 模型由消费者来拉消息(消费者承担额外的轮询开销)。 如果在 Kafka 中引入消费者确认机制,就需要 broker 维护消息消费状态,要做到高可靠就需要写文件持久化并与生产消息同步,这将急剧降低 Kafka 的性能,这种设计也极类似 RabbitMQ。如果不改变 Kafka 的实现,而是在 Kafka 和消费者之间做一层封装,还是需要实现一套类似 RabbitMQ 的消费确认和持久化机制。 参考资料:
推荐阅读
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 工程项目管理系统源码简洁+好用+全面工程项目管理系统
    ​​工程项目管理系统是指从事工程项目管理的企业(以下简称工程项目管理企业)受业主委托,按照合同约定,代表业主对工程项目的组织 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • Java中高级工程师面试必备:JVM核心知识点全面解析
    对于软件开发人员而言,随着技术框架的不断演进和成熟,许多高级功能已经被高度封装,使得初级开发者只需掌握基本用法即可迅速完成项目。然而,对于中高级工程师而言,深入了解Java虚拟机(JVM)的核心知识点是必不可少的。这不仅有助于优化性能和解决复杂问题,还能在面试中脱颖而出。本文将全面解析JVM的关键概念和技术细节,帮助读者全面提升技术水平。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • 字节Java高级岗:java开发cpu吃多线程吗
    前言抱着侥幸心理投了字节跳动后台JAVA开发岗,居然收到通知去面试,一面下整个人来都是懵逼的,不知道我对着面试官都说了些啥(捂脸~~)。侥幸一面居然过了,三天后接到二面通知,结果这 ... [详细]
  • Alibaba珍藏版mybatis手写文档,值得一读!
    一面问题:MySQLRedisKafka线程算法mysql知道哪些存储引擎,它们的区别mysql索引在什么情况下会失效mysql在项目中的优化场景&# ... [详细]
author-avatar
手机用户2502914831
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有