热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitmq04rabbitmq解决分布式事务

1.RabbitMQ消息重试机制消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?答案:使用消息

1.RabbitMQ消息重试机制


  • 消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?
    答案:使用消息重试机制。(springboot默认有消息重试机制)

1.1 如何合适选择重试机制


  1. 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)
  2. 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。

1.2 如何实现重试机制

对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿


2.解决幂等性


网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,在重试过程中,可能会造成重复消费。



2.1 消费者如何保证消息幂等性,不被重复消费


  1. 使用全局MessageID判断消费方使用同一个,解决幂等性。
  2. 或者使用业务逻辑保证唯一(比如订单号码,保证一个订单号码只可能被插入一次数据库)

2.2 代码实现


2.2.1 生产者


  • FanoutIdemProducer

2.2.2 消费者


  • FanoutIdemEamilConsumer

2.2.3 配置文件

spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: /springbootfanoutlistener:simple:retry:####开启消费者重试enabled: true####最大重试次数max-attempts: 5####重试间隔次数initial-interval: 3000

3.RabbitMQ签收模式配置


3.1 开启手动应答模式

spring:rabbitmq:listener:simple:####开启手动ackacknowledge-mode: manual

3.2 消费者

/*** 描述:签收模式演示* @author: myx* @date: 2019-05-01* Copyright © 2019-grape. All rights reserved.*/
@Component
public class AckEamilConsumer {/*** 消费者* @param message*/@RabbitListener(queues = FanoutConstrant.QUEUE_EMAIL_NAME)public void process(Message message, @Headers Map headers, Channel channel) throws IOException {System.out.println(Thread.currentThread().getName()+ ",邮件消费者获取生产者消息msg:"+ new String(message.getBody(), "UTF-8")+ ",messageId:" + message.getMessageProperties().getMessageId());// 手动ackLong deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);// 手动签收channel.basicAck(deliveryTag, false);}
}

4.RabbitMQ死信队列


4.1 什么是死信队列

死信队列是当消息在一个队列因为下列原因:


  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
    变成了 “死信” 后被重新投递(publish)到另一个Exchange,该Exchange 就是DLX然后该Exchange 根据绑定规则转发到对应的队列上监听该队列 就可以重新消费.说白了就是没有被消费的消息换个地方重新被消费

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

4.2 应用场景分析

在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息


4.3 如何使用死信交换机

定义业务(普通)队列的时候指定参数


  • x-dead-letter-exchange: 用来设置死信后发送的交换机
  • x-dead-letter-routing-key:用来设置死信的routingKey

4.4 代码实现


4.4.1 配置


  • DeadFanoutConfig

4.4.2 邮件消费者


  • DeadEamilConsumer

4.4.3 死信消费者


  • DeadConsumer

5.RabbitMQ解决分布式事务

案例:


经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。



5.1 RabbitMQ解决分布式事务原理

采用最终一致性原理。
需要保证以下三要素:


  1. 确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
  2. MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)
  3. 如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)

在这里插入图片描述


5.2 模拟代码实现


5.2.1 生产者

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {&#64;Autowiredprivate OrderMapper orderMapper;&#64;Autowiredprivate RabbitTemplate rabbitTemplate;public ResponseBase addOrderAndDispatch() {OrderEntity orderEntity &#61; new OrderEntity();orderEntity.setName("订单名称");orderEntity.setOrderCreatetime(new Date());// 价格是300元orderEntity.setOrderMoney(300d);// 状态为 未支付orderEntity.setOrderState(0);Long commodityId &#61; 30l;// 商品idorderEntity.setCommodityId(commodityId);String orderId &#61; UUID.randomUUID().toString();orderEntity.setOrderId(orderId);// ##################################################// 1.先下单&#xff0c;创建订单 (往订单数据库中插入一条数据)int orderResult &#61; orderMapper.addOrder(orderEntity);System.out.println("orderResult:" &#43; orderResult);if (orderResult <&#61; 0) {return setResultError("下单失败!");}// 2.使用消息中间件将参数存在派单队列中send(orderId);return setResultSuccess();}private void send(String orderId) {JSONObject jsonObect &#61; new JSONObject();jsonObect.put("orderId", orderId);String msg &#61; jsonObect.toJSONString();System.out.println("msg:" &#43; msg);// 封装消息Message message &#61; MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId).build();// 构建回调返回的数据CorrelationData correlationData &#61; new CorrelationData(orderId);// 发送消息this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);}// 生产消息确认机制&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String orderId &#61; correlationData.getId();System.out.println("消息id:" &#43; correlationData.getId());if (ack) {System.out.println("消息发送确认成功");} else {send(orderId);System.out.println("消息发送确认失败:" &#43; cause);}}}

5.2.2 补单消费者

&#64;Component
public class CreateOrderConsumer {&#64;Autowiredprivate OrderMapper orderMapper;&#64;RabbitListener(queues &#61; "order_create_queue")public void process(Message message, &#64;Headers Map headers, Channel channel) throws Exception {String messageId &#61; message.getMessageProperties().getMessageId();String msg &#61; new String(message.getBody(), "UTF-8");System.out.println("补单消费者" &#43; msg &#43; ",消息id:" &#43; messageId);JSONObject jsonObject &#61; JSONObject.parseObject(msg);String orderId &#61; jsonObject.getString("orderId");// 判断订单是否存在&#xff0c;如果不存在 实现自动补单机制OrderEntity orderEntityResult &#61; orderMapper.findOrderId(orderId);if (orderEntityResult !&#61; null) {System.out.println("订单已经存在 无需补单 orderId:" &#43; orderId);return;}// 订单不存在 &#xff0c;则需要进行补单OrderEntity orderEntity &#61; new OrderEntity();orderEntity.setName("订单名称");orderEntity.setOrderCreatetime(new Date());// 价格是300元orderEntity.setOrderMoney(300d);// 状态为 未支付orderEntity.setOrderState(0);Long commodityId &#61; 30l;// 商品idorderEntity.setCommodityId(commodityId);orderEntity.setOrderId(orderId);// ##################################################// 1.先下单&#xff0c;创建订单 (往订单数据库中插入一条数据)try {int orderResult &#61; orderMapper.addOrder(orderEntity);System.out.println("orderResult:" &#43; orderResult);if (orderResult >&#61; 0) {// 手动签收消息,通知mq服务器端删除该消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {// 丢弃该消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}

5.2.3 RabbitmqConfig

&#64;Component
public class RabbitmqConfig {// 下单并且派单存队列public static final String ORDER_DIC_QUEUE &#61; "order_dic_queue";// 补单队列&#xff0c;判断订单是否已经被创建public static final String ORDER_CREATE_QUEUE &#61; "order_create_queue";// 下单并且派单交换机private static final String ORDER_EXCHANGE_NAME &#61; "order_exchange_name";// 1.定义订单队列&#64;Beanpublic Queue directOrderDicQueue() {return new Queue(ORDER_DIC_QUEUE);}// 2.定义补订单队列&#64;Beanpublic Queue directCreateOrderQueue() {return new Queue(ORDER_CREATE_QUEUE);}// 2.定义交换机&#64;BeanDirectExchange directOrderExchange() {return new DirectExchange(ORDER_EXCHANGE_NAME);}// 3.订单队列与交换机绑定&#64;BeanBinding bindingExchangeOrderDicQueue() {return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");}// 3.补单队列与交换机绑定&#64;BeanBinding bindingExchangeCreateOrder() {return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");}}

5.2.4 派单服务-消费者

&#64;Component
public class DispatchConsumer {&#64;Autowiredprivate DispatchMapper dispatchMapper;&#64;RabbitListener(queues &#61; "order_dic_queue")public void process(Message message, &#64;Headers Map headers, Channel channel) throws Exception {String messageId &#61; message.getMessageProperties().getMessageId();String msg &#61; new String(message.getBody(), "UTF-8");System.out.println("派单服务平台" &#43; msg &#43; ",消息id:" &#43; messageId);JSONObject jsonObject &#61; JSONObject.parseObject(msg);String orderId &#61; jsonObject.getString("orderId");if (StringUtils.isEmpty(orderId)) {// 日志记录return;}DispatchEntity dispatchEntity &#61; new DispatchEntity();// 订单iddispatchEntity.setOrderId(orderId);// 外卖员iddispatchEntity.setTakeoutUserId(12l);// 外卖路线dispatchEntity.setDispatchRoute("40,40");try {int insertDistribute &#61; dispatchMapper.insertDistribute(dispatchEntity);if (insertDistribute > 0) {// 手动签收消息,通知mq服务器端删除该消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {e.printStackTrace();// // 丢弃该消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}}

推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • C#编程指南:实现列表与WPF数据网格的高效绑定方法 ... [详细]
  • 本文详细解析了 MySQL 5.7.20 版本中二进制日志(binlog)崩溃恢复机制的工作流程。假设使用 InnoDB 存储引擎,并且启用了 `sync_binlog=1` 配置,文章深入探讨了在系统崩溃后如何通过 binlog 进行数据恢复,确保数据的一致性和完整性。 ... [详细]
  • Android ListView 自定义 CheckBox 实现列表项多选功能详解
    本文详细介绍了在Android开发中如何在ListView的每一行添加CheckBox,以实现列表项的多选功能。用户不仅可以通过点击复选框来选择项目,还可以通过点击列表的任意一行来完成选中操作,提升了用户体验和操作便捷性。同时,文章还探讨了相关的事件处理机制和布局优化技巧,帮助开发者更好地实现这一功能。 ... [详细]
  • 本文探讨了将PEBuilder转换为DIBooter.sh的方法,重点介绍了如何将DI工具集成到启动层,实现离线镜像引导安装。通过使用DD命令替代传统的grub-install工具,实现了GRUB的离线安装。此外,还详细解析了bootice工具的工作原理及其在该过程中的应用,确保系统在无网络环境下也能顺利引导和安装。 ... [详细]
  • 随着越来越多的应用程序采用JSON格式作为响应数据,基于Spring Framework构建的服务端应用也广泛采用了这一实践。本文将详细介绍如何在Spring 4.x版本的MVC框架中配置和实现HTTP请求返回JSON数据流,涵盖相关配置、依赖管理和代码示例,帮助开发者高效地实现这一功能。 ... [详细]
  • Spring框架下发送嵌入图片邮件时遇到的技术挑战与解决方案
    在Spring框架中发送嵌入图片的HTML格式邮件时,常遇到技术挑战。一种有效的解决方案是在邮件内容中直接使用``标签来引用图片。此外,还可以通过MimeMessageHelper类的addInline方法将图片作为内联资源添加到邮件中,确保图片能够正确显示。这种方法不仅提高了邮件的可读性,还增强了用户体验。 ... [详细]
  • 在 Golang 应用中,频繁出现的 TIME_WAIT 和 ESTABLISHED 状态可能会导致性能瓶颈。本文探讨了这些状态产生的原因,并提出了优化与解决策略。通过调整内核参数、优化连接管理和使用连接池技术,可以有效减少 TIME_WAIT 的数量,提高应用的并发处理能力。同时,对于 ESTABLISHED 状态,可以通过合理的超时设置和错误处理机制,确保连接的高效利用和快速释放。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
  • 本文深入探讨了Spring Cloud Eureka在企业级应用中的高级使用场景及优化策略。首先,介绍了Eureka的安全配置,确保服务注册与发现过程的安全性。接着,分析了Eureka的健康检查机制,提高系统的稳定性和可靠性。随后,详细讨论了Eureka的各项参数调优技巧,以提升性能和响应速度。最后,阐述了如何实现Eureka的高可用性部署,保障服务的连续性和可用性。通过这些内容,开发者可以更好地理解和运用Eureka,提升微服务架构的整体效能。 ... [详细]
  • 掌握DSP必备的56个核心问题,我已经将其收藏以备不时之需! ... [详细]
  • 本讲深入探讨了 Python 中集合的基本概念及其符号表示方法,通过实例代码详细解析了如何将列表转换为集合,并展示了集合在数据处理中的独特优势。 ... [详细]
author-avatar
_ZY寶貝_
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有