热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ高级:分布式事务详解案例——可靠消费(三)

阿巴阿巴阿巴.上篇博文讲述了分布式系统分布式事务的问题,引入RabbitMQ消息中间件的解决方案,详细演示了基于订单系统的可靠生产,

阿巴阿巴阿巴....  上篇博文讲述了分布式系统分布式事务的问题,引入RabbitMQ消息中间件的解决方案,详细演示了基于订单系统的可靠生产,本篇文章将用实际代码和例子,讲解消费者可靠消费问题 ,整体的架构代码翻阅上篇博文,这里就不一一完全贴出来了



前言

由于生产者和消费者不直接通信,生产者只负责把消息发送到队列,消费者负责从队列获取消息,消息被"消费"后,是需要从队列中删除的,那怎么确认消息被"成功消费"了呢?


消费者确认


消费者确认分两种:自动确认手动确认


在自动确认模式中,消息在发送到消费者后即被认为"成功消费",这种模式可以降低吞吐量(只要消费者可以跟上),以降低交付和消费者处理的安全性,这种模式通常被称为“即发即忘”,与手动确认模型不同,如果消费者的TCP连接或通道在真正的"成功消费"之前关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,并不适用于所有工作负载.



消费者过载

手动确认模式通常与有界信道预取(BasicQos方法)一起使用,该预取限制了信道上未完成(“进行中”)的消息数量。自动确认没有这种限制。因此消费者可能会被消息的发送速度所淹没,可能会导致消息积压并耗尽堆,或使操作系统终止其进程。


案例说明


上篇博文讲述了,订单服务的可靠生产问题,本篇文章,将继续改造配送中心的代码


分布式系统分布式事务解决方案——配送中心篇


 


dispatcher 配送中心系统相关配置增加

①. 配送中心系统pom.xml依赖


上篇博文中已经详细展示了配送中心的完整代码,本文例子将不会再逐一贴出,在原有依赖上,增加了mq、lombok、json格式化等依赖


org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-starter-jdbcmysqlmysql-connector-javaorg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-amqporg.springframework.amqpspring-rabbit-testtestnet.sf.json-libjson-lib2.2.3jdk15

②. application.yml 文件增加MQ的连接配置

spring:rabbitmq:host: 127.0.0.1username: wpf2password: 123port: 5672virtual-host: test_host

③ . 建立 Order 实体类,与订单服务系统中一致,主要用于接收消息的json转换

@Data
public class Order {// 订单IDprivate int orderId;// 用户名private String userName;// 商品内容private String context;// 购买数量private int num;
}

整体项目结构调整如下:


消费者:普通消费


基于配送中心的消息普通消费,消费者收到消息后即被认为"成功消费"


1. 编写 OrderMQConsumer 消费者类,监听订单队列,进行消息的消费

/**** @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用* @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理* 具体使用哪个方法处理,根据 MessageConverter转换后的参数类型 (message的类型匹配到哪个方法就交给哪个方法处理)*/
// 监听order_confirm_fanoutQueue队列,该队列在订单服务系统声明创建的
@Component
@RabbitListener(queues = {"order_confirm_fanoutQueue"})
public class OrderMQConsumer {@Autowiredprivate DispatchService dispatchService;private int count=1;// 接收消息@RabbitHandlerpublic void receiveMess(String message, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {System.out.println("接收到订单消息:"+message+",count:"+count++);// 2.获取订单信息:mq消息存的是json格式,需要转换回来Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);String orderId = order.getOrderId()+"";// 3.保存运单dispatchService.dispatch(orderId);}
}

2. 由于在上篇博文中,我们已经往消息队列中投递了2条消息,因此这里直接消费即可,不需要再启动订单系统的创建订单方法了

3. 查看数据库配送表结果

4. 查看图形化界面,队列消息的消费情况

结论:在消费者类中读取队列中的消息,根据订单ID,调用dispatchService.dispatch(orderId) 方法,往配送表插入了2条数据。


思考:MQ服务器宕机或其他因素,导致数据未入库怎么办?


生产环境往往存在很多风险,比如服务器宕机,系统重启等等


由运行结果可知,如果接收消息时发生异常,会触发服务器的重试机制,陷入死循环!如果是集群模式下,会造成MQ服务器奔溃,引发磁盘和内存消耗殆尽,知道服务器宕机为止!


消费者:可靠消费


解决上述代码异常,导致消息死循环重试的几种方案



  1. 控制重试的次数 + 死信队列
  2. try/catch + 手动ack
  3. try/catch + 手动ack + 死信队列处理 + 人工干预


方式一:配置消息重试次数

1. 在application.yml文件中配置MQ重试次数,如下

spring:rabbitmq:host: 127.0.0.1username: wpf2password: 123port: 5672virtual-host: test_hostlistener:simple:retry:enabled: true # 开启重试,默认是false关闭状态max-attempts: 3 # 最大重试次数,默认是3次initial-interval: 2000ms # 每次重试间隔时间

2. 启动订单服务,让其创建一条订单,并投递到消息队列

3. 查看消息投递结果

4. 启动配送中心服务,进行消息消费

5. 查看运行结果

6. 图形化队列消息结果:达到最大重试次数后,队列中的消息被抛弃,无法再次捞回

结论:由于我们配置了重试次数,因此消费消息时,即使发生了异常,也不会陷入死循环,不断的重试,最终导致系统奔溃,cpu飙升等情况


虽然能够解决死循环问题,但是这种情况会造成消息丢失,最终配送中心无法对这个订单进行入库等操作,从而造成 订单系统和 配送系统 数据不一致问题!



方式二:try/catch + 手动确认消息

1. 配送中心系统的application.yml 配置文件中配置开启手动ack

# 参数说明:none 不确认 auto 自动确认 manual 手动确认
acknowledge-mode: manual注意:之前的配置的重试策略的参数可以去除掉了,重试策略本质也是针对消息的确认即使没把重试的参数配置删除,也不会生效的,如果开启了手动ack

2. 接收消息代码如下

// 接收消息@RabbitHandlerpublic void receiveMess(String message, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {// 1.获取消息队列的消息System.out.println("接收到订单消息:"+message+",count:"+count++);// 2.获取订单信息:mq消息存的是json格式,需要转换回来Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);String orderId = order.getOrderId()+"";// 3.保存运单dispatchService.dispatch(orderId);System.out.println(1/0); // 出现异常// 4.收到ack告诉mq消息已经正常消费channel.basicAck(tag,false);}catch (Exception e){// 如果出现异常的情况下,根据实际的情况去进行重发/** @param1 : 传递标签,消息的tag* @param2 : 确认一条消息还是多条, false:只确认e.DeliverTag这条消息 true:确认小于等于e.DeliverTag的所有消息* @param3 : 消息失败了是否进行重发 * false:消息直接丢弃,不重发,如果绑定了死信队列,则消息打入死信队列* true:重发,设置为true,就不要加到try/catch代码中,否则会进入重发死循环*/channel.basicNack(tag,false,false);}}

了解三个代码关键处:

流程解析:


  1. 发生异常,流程进入到catch块
  2. channel.basicAck(tag,false); ,第一个参数是消息的标签,第二个参数是确认一条消息还是多条,我们设置的是false,表示只确认当前处理的这条消息,确认消费成功了
  3. catch块是针对消息处理的策略,准备如何处理这条消息?直接抛异常丢弃消息,还是触发消息的重新发送,具体需要根据业务进行处理

!! 注意:用了try/catch ,yml 配置了重试次数没有意义,try/catch会屏蔽掉重试策略!!


上述图第3步,try/catch中如果将 channel.basicNack(tag,false,false); 的第三个参数requeue设置为false,表示消息会直接丢弃,如果队列绑定了接盘侠死信队列,则消息会被转发到死信队列。如果requeue=true,则会进入重试死循环!


选择建议:用了try/catch ,就不要使用try/catch,二选其一即可


 ▎ 方式三:死信队列配置


如果消息没有正常消费,我们应该设置队列的接盘侠,专门处理这些异常消息,再由一个消费者去监听死信队列,针对消息做特殊处理,整体流程如下:


由于我们的队列绑定和声明是在订单服务中完成的,因此需要修改订单系统的代码,配送中心的代码保持方式二中的消费配置,不需要修改。

1. 声明创建死信组件

@Configuration
public class Order_RabbitConfiguration {// 1.配置死信交换机、队列&#64;Beanpublic FanoutExchange getDeadExchange(){return new FanoutExchange("dead_order_fanoutExchange",true,false);}&#64;Beanpublic Queue getDeadQueue(){return new Queue("dead_order_fanoutQueue",true);}&#64;Beanpublic Binding getDeadBinding(){return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange());}// 2.配置普通队列&#64;Beanpublic FanoutExchange getExchange(){return new FanoutExchange("order_confirm_fanoutExchange",true,false);}&#64;Beanpublic Queue getQueue(){// 设置消息接盘侠&#xff1a;队列已满、消息拒收、消息异常 等情况&#xff0c;该条消息就会被重新路由到死信队列Map args &#61; new HashMap<>();args.put("x-dead-letter-exchange","dead_order_fanoutExchange");return new Queue("order_confirm_fanoutQueue",true,false,false,args);}&#64;Beanpublic Binding getBinding(){return BindingBuilder.bind(getQueue()).to(getExchange());}
}

2. 去图形化界面中&#xff0c;删除原来的队列&#xff0c;由于我们增加了队列的接盘侠&#xff0c;因此重新设置属性的话&#xff0c;需要删除原来的队列&#xff0c;重新创建&#xff0c;否则启动会报异常

!! 注意&#xff1a;生产环境不建议这么做&#xff0c;最好是重新创建新的队列进行绑定&#xff0c;生产者路由到新队列中

3. 运行创建订单接口

4. 图形化界面查看&#xff0c; 消息投递情况和队列创建信息

5. 消费的代码不变&#xff0c;同样是try/catch &#43; 手动ack模式&#xff0c;并且制造一个异常

 6. 启动配送中心系统

 7. 查看图形化结果


监听死信队列


由于上述消费异常&#xff0c;订单队列绑定了接盘侠死信队列&#xff0c;未被正常消费成功的消息会被重新路由到死信队列&#xff0c;因此我们也需要监听死信队列&#xff0c;进行消息消费&#xff01;


在配送中心系统&#xff0c;新建死信队列的消费者

&#64;Component
&#64;RabbitListener(queues &#61; {"dead_order_fanoutQueue"}) // 监听死信队列
public class DeadOrderMQConsumer {&#64;Autowiredprivate DispatchService dispatchService;// 接收消息&#xff1a;代码与订单消费者一致&#64;RabbitHandlerpublic void receiveMess(String message, Channel channel, CorrelationData correlationData, &#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {// 1.获取消息队列的消息System.out.println("消息进入到了死信队列&#xff0c;开始处理异常消息&#xff1a;"&#43;message);// 2.获取订单信息&#xff1a;mq消息存的是json格式&#xff0c;需要转换回来Order order &#61; (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);String orderId &#61; order.getOrderId()&#43;"";// 3.保存运单dispatchService.dispatch(orderId);// 4.收到ack告诉mq消息已经正常消费channel.basicAck(tag,false);}catch (Exception e){// 由于消息进入了死信队列&#xff0c;说明消息有异常&#xff0c;需要采取新的措施来处理这条消息// 比如人为进行处理&#xff0c;同时也需要从队列中移除这条消息&#xff0c;防止死信队列堆积System.out.println("人工干预");System.out.println("发送邮件、短信通知技术人员等");System.out.println("将消息存入其他DB库&#xff0c;技术人员好根据消息排查");// 同样也要Nack这条消息&#xff0c;保障死信队列不会产生消息堆积channel.basicNack(tag,false,false);}}}

2. 重新启动配送系统

3. 查看图形化界面

结论&#xff1a;由上图可知&#xff0c;死信队列的消息被正常消费成功了&#xff0c;从队列中移除。死信队列的消费代码与订单消费者一致&#xff0c;只是在catch块的处理消息策略&#xff0c;需要额外增加其他处理机制


其他问题

yml 配置手动ACK后&#xff0c;消费时没有进行消息确认&#xff0c;会导致重复消费


我们知道配置重试策略&#xff0c;当达到最大重试次数&#xff0c;消息会从队列中自动删除&#xff0c;如果同时也配置了手动ack&#xff0c;但实际代码没有进行ack的设置&#xff0c;则达到最大重试次数后&#xff0c;消息不会被删除&#xff0c;而是从ready就绪状态&#xff0c;变更为未应答状态


1. 配置了2种

2. 消费代码如下&#xff0c;虽然配置了手动ack参数&#xff0c;但是代码中并没有手动确认

&#64;RabbitHandlerpublic void receiveMess(String message, Channel channel, CorrelationData correlationData, &#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {System.out.println("接收到订单消息&#xff1a;"&#43;message&#43;"&#xff0c;count&#xff1a;"&#43;count&#43;&#43;);// 2.获取订单信息&#xff1a;mq消息存的是json格式&#xff0c;需要转换回来Order order &#61; (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);String orderId &#61; order.getOrderId()&#43;"";// 3.保存运单dispatchService.dispatch(orderId);}

3. 控制台打印成功消费的信息&#xff0c;但是队列中消息并不会移除&#xff0c;而是从ready就绪状态&#xff0c;变更为未应答状态&#xff0c;重启项目&#xff0c;又会再次重复消费&#xff0c;直到有手动ack的消费者&#xff0c;将这条消息消费掉

!! 注意&#xff1a;由于执行了dispatchService.dispatch(orderId);&#xff0c;导致数据库创建了多条 99087的数据&#xff0c;因此需要注意&#xff0c;如果yml配置了手动确认ack&#xff0c;但代码消费时并没有确认消息就会造成重复消费&#xff01;


 


写在最后&#xff1a;生产环境可靠消费注意事项


可靠生产中&#xff0c;需要确保消息正确投递到队列中去&#xff0c;由于外界因素&#xff0c;网络波动导致处理延迟等因素&#xff0c;而可能会造成消息的投递失败&#xff0c;或者是多次投递。


例如订单服务投递消息成功了&#xff0c;但由于MQ服务器宕机&#xff0c;订单服务未及时收到消息投递的回执结果&#xff0c;触发消息的重试机制&#xff0c;消息被二次投递&#xff0c;实际消息队列中存在多条同一个订单消息记录。

结论&#xff1a;消费者在消费消息时&#xff0c;要保证数据的幂等性&#xff0c;不能重复消费同一个订单。


基于MQ的分布式事务解决方案总结


建议&#xff1a;最好是使用单体架构去处理&#xff0c;避免分布式事务&#xff0c;而非必要同步的非核心业务做成异步&#xff0c;提高响应速度&#xff01;


优点&#xff1a;


  1. 通用性强

  2. 拓展方便

  3. 耦合度低&#xff0c;方案也比较成熟

缺点&#xff1a;


  1. 基于消息中间件&#xff0c;只适合异步场景

  2. 消息会延迟处理&#xff0c;需要业务上能够容忍


推荐阅读
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 智能制造数据综合分析与应用解决方案
    在智能制造领域,生产数据通过先进的采集设备收集,并利用时序数据库或关系型数据库进行高效存储。这些数据经过处理后,通过可视化数据大屏呈现,为生产车间、生产控制中心以及管理层提供实时、精准的信息支持,助力不同应用场景下的决策优化和效率提升。 ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • 在 Linux 系统中,`/proc` 目录实现了一种特殊的文件系统,称为 proc 文件系统。与传统的文件系统不同,proc 文件系统主要用于提供内核和进程信息的动态视图,通过文件和目录的形式呈现。这些信息包括系统状态、进程细节以及各种内核参数,为系统管理员和开发者提供了强大的诊断和调试工具。此外,proc 文件系统还支持实时读取和修改某些内核参数,增强了系统的灵活性和可配置性。 ... [详细]
  • 西北工业大学作为陕西省三所985和211高校之一,虽然在农业和林业领域不如某些顶尖院校,但在航空航天领域的实力尤为突出。该校的计算机科学专业在科研和教学方面也具有显著优势,是考研的理想选择。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
  • 加密要用到Crypto安装包pipinstallCrypto新建两个模块rsautils.py,rsatest.py直接上代码,rsautils.py#!usrbinenv ... [详细]
  • 本文详细解析了JSONP(JSON with Padding)的跨域机制及其工作原理。JSONP是一种通过动态创建``标签来实现跨域请求的技术,其核心在于利用了浏览器对``标签的宽松同源策略。文章不仅介绍了JSONP的产生背景,还深入探讨了其具体实现过程,包括如何构造请求、服务器端如何响应以及客户端如何处理返回的数据。此外,还分析了JSONP的优势和局限性,帮助读者全面理解这一技术在现代Web开发中的应用。 ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 进程(Process)是指计算机中程序对特定数据集的一次运行活动,是系统资源分配与调度的核心单元,构成了操作系统架构的基础。在早期以进程为中心的计算机体系结构中,进程被视为程序的执行实例,其状态和控制信息通过任务描述符(task_struct)进行管理和维护。本文将深入探讨进程的概念及其关键数据结构task_struct,解析其在操作系统中的作用和实现机制。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
author-avatar
最棒小小丫_635
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有