对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿
网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,在重试过程中,可能会造成重复消费。
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: /springbootfanoutlistener:simple:retry:####开启消费者重试enabled: true####最大重试次数max-attempts: 5####重试间隔次数initial-interval: 3000
spring:rabbitmq:listener:simple:####开启手动ackacknowledge-mode: manual
/*** 描述:签收模式演示* @author: myx* @date: 2019-05-01* Copyright © 2019-grape. All rights reserved.*/
@Component
public class AckEamilConsumer {/*** 消费者* @param message*/@RabbitListener(queues = FanoutConstrant.QUEUE_EMAIL_NAME)public void process(Message message, @Headers Map
}
死信队列是当消息在一个队列因为下列原因:
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息
定义业务(普通)队列的时候指定参数
案例:
经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。
采用最终一致性原理。
需要保证以下三要素:
@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {&#64;Autowiredprivate OrderMapper orderMapper;&#64;Autowiredprivate RabbitTemplate rabbitTemplate;public ResponseBase addOrderAndDispatch() {OrderEntity orderEntity &#61; new OrderEntity();orderEntity.setName("订单名称");orderEntity.setOrderCreatetime(new Date());// 价格是300元orderEntity.setOrderMoney(300d);// 状态为 未支付orderEntity.setOrderState(0);Long commodityId &#61; 30l;// 商品idorderEntity.setCommodityId(commodityId);String orderId &#61; UUID.randomUUID().toString();orderEntity.setOrderId(orderId);// ##################################################// 1.先下单&#xff0c;创建订单 (往订单数据库中插入一条数据)int orderResult &#61; orderMapper.addOrder(orderEntity);System.out.println("orderResult:" &#43; orderResult);if (orderResult <&#61; 0) {return setResultError("下单失败!");}// 2.使用消息中间件将参数存在派单队列中send(orderId);return setResultSuccess();}private void send(String orderId) {JSONObject jsonObect &#61; new JSONObject();jsonObect.put("orderId", orderId);String msg &#61; jsonObect.toJSONString();System.out.println("msg:" &#43; msg);// 封装消息Message message &#61; MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId).build();// 构建回调返回的数据CorrelationData correlationData &#61; new CorrelationData(orderId);// 发送消息this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);}// 生产消息确认机制&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String orderId &#61; correlationData.getId();System.out.println("消息id:" &#43; correlationData.getId());if (ack) {System.out.println("消息发送确认成功");} else {send(orderId);System.out.println("消息发送确认失败:" &#43; cause);}}}
&#64;Component
public class CreateOrderConsumer {&#64;Autowiredprivate OrderMapper orderMapper;&#64;RabbitListener(queues &#61; "order_create_queue")public void process(Message message, &#64;Headers Map
}
&#64;Component
public class RabbitmqConfig {// 下单并且派单存队列public static final String ORDER_DIC_QUEUE &#61; "order_dic_queue";// 补单队列&#xff0c;判断订单是否已经被创建public static final String ORDER_CREATE_QUEUE &#61; "order_create_queue";// 下单并且派单交换机private static final String ORDER_EXCHANGE_NAME &#61; "order_exchange_name";// 1.定义订单队列&#64;Beanpublic Queue directOrderDicQueue() {return new Queue(ORDER_DIC_QUEUE);}// 2.定义补订单队列&#64;Beanpublic Queue directCreateOrderQueue() {return new Queue(ORDER_CREATE_QUEUE);}// 2.定义交换机&#64;BeanDirectExchange directOrderExchange() {return new DirectExchange(ORDER_EXCHANGE_NAME);}// 3.订单队列与交换机绑定&#64;BeanBinding bindingExchangeOrderDicQueue() {return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");}// 3.补单队列与交换机绑定&#64;BeanBinding bindingExchangeCreateOrder() {return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");}}
&#64;Component
public class DispatchConsumer {&#64;Autowiredprivate DispatchMapper dispatchMapper;&#64;RabbitListener(queues &#61; "order_dic_queue")public void process(Message message, &#64;Headers Map