热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ死信队列|消费者出现异常如何处理?

在RocketMQ重复消费问题|订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复

在RocketMQ 重复消费问题 | 订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复消费的问题。

今天要讨论的是消费者代码执行过程中出现异常,我们应该如何处理?


手动提交 offset

首先来看一段代码,Consumer 类是一个消费者类,它我们为它注册了一个监听器,在处理完消息之后,会将消息的状态返回给 RocketMQ,执行成功返回的是消息状态是 CONSUME_SUCCESS

public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer cOnsumer= new DefaultMQPushConsumer("test_consumer");
// 设置 NameServer 地址
consumer.setNamesrvAddr("");
// 订阅 Topic
consumer.subscribe("TopicTest", "*");
// 这次回调接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}

画一张图来表示向 RocketMQ 提交消息状态的流程,如图所示:

技术图片


消息者业务代码出现异常怎么办?

再来看一下消费者的代码中监听器的部分,它说如果消息处理成功,那么就返回消息状态为 CONSUME_SUCCESS,也有可能发放优惠券、积分等操作出现了异常,比如说数据库挂掉了。这个时候应该怎么处理呢?

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

我们可以把代码改一改,捕获异常之后返回消息的状态为 RECONSUME_LATER 表示稍后重试。

// 这次回调接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 对消息的处理,比如发放优惠券、积分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 万一发生数据库宕机等异常,返回稍后重试消息的状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});

重试队列

这个时候,消息会进入到 RocketMQ 的重试队列中。



  • 比如说消费者所属的消息组名称为AAAConsumerGroup

  • 其重试队列名称就叫做%RETRY%AAAConsumerGroup

  • 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列

  • 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列

技术图片


死信队列

  • 重试队列中的消息重试16次任然无法执行,将会进入到死信队列

  • 死信队列的名字是 %DLQ%AAAConsumerGroup

  • 死信队列中的消息可以后台开一个线程,订阅%DLQ%AAAConsumerGroup,并不停重试

技术图片


总结

技术图片

本文从消费者的业务代码出现异常讲起,介绍了 RocketMQ 的重试队列和死信队列:



  1. 代码正常执行返回消息状态为CONSUME_SUCCESS,执行异常返回RECONSUME_LATER

  2. 状态为RECONSUME_LATER的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName

  3. 重试16次消息任然没有处理成功,消息就会进入到死信队列%DLQ% + ConsumerGroupName;

RocketMQ 死信队列 | 消费者出现异常如何处理?



推荐阅读
  • C#的Type对象的简单应用
    通过Type对象可以获取类中所有的公有成员直接贴代码:classMyClass{privatestringname;privateintid;publicstringcity;pu ... [详细]
  • Android性能优化检测App卡顿
    在移动APP性能评测-流畅度评测中,我们介绍了如何准确客观评价APP的流畅度,最终采用SM指标来评价应用的流畅度,在知道如何评价流畅度之后 ... [详细]
  • FroggerTimeLimit:1000MSMemoryLimit:65536KTotalSubmissions:32257Accepted:10396DescriptionFr ... [详细]
  • 九宫格计算. ... [详细]
  • 本文分析和介绍了GLo ... [详细]
  • rbac 4表 常规设计
    rbac4表常规设计设计模型:1、管理员表(users)Schema::create('users',function(Blueprint$table){$tabl ... [详细]
  • 猫猫分享,必须精品原文地址:http:blog.csdn.netu013357243articledetails44571163素材地址:http:download.csdn.n ... [详细]
  • 2019.4.14第1001题:SumProblemProblemDescriptionHey,welcometoHDOJ(HangzhouDianziUniversityOnli ... [详细]
  • 接口自动化相关面试题
    你好,我是懂Java的测试最近辅导简历,有同学向我反馈,自学过接口自动化、没有落地接口自动化项目办?还有很多同学落地实践过自 ... [详细]
  • packagetest;importjava.io.FileInputStream;importjava.io.FileOutputStream;importjava.io.IOE ... [详细]
  • 读书这件事
    没事晒下自己的借书清单。60多本了,不知道自己一本子能读多少本?可惜的就是没写读书笔记,都是一些泛读。图书馆要 ... [详细]
  • phpstorm使用和配置技巧
    1.使用phpstorm的过程中,有时光标不小心变成了方块状,怎么修复回来呢?见下图,去掉“Useblockcare ... [详细]
  • IOSUITableView解析(一)
    UITableView的作用由于Iphone的大小有限,所以UITableView的作用是巨大的。比如QQ,微博等应用都用到了该控件。UITableVi ... [详细]
  • vector:在vc6中,如果要镶嵌使用vector,如vector,后面的两个应该用,空格隔开,否则被编译器认为是移位符string::npos的值为 ... [详细]
  • Spark 贝叶斯分类算法
    一、贝叶斯定理数学基础我们都知道条件概率的数学公式形式为即B发生的条件下A发生的概率等于A和B同时发生的概率除以B发生的概率。根据此公式变换,得到贝叶斯公式:即贝叶斯定律是关于随机 ... [详细]
author-avatar
陈可不能哭
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有