热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ并发消息消费机制详解

本文详细介绍了RocketMQ中的消息并发消费机制,包括消息拉取后的处理流程、消费服务的调用以及消费任务的具体执行过程。

在RocketMQ中,当消息被成功拉取后,这些消息会被存储到ProcessQueue中,随后通过调用consumeMessageService.submitConsumeRequest方法来通知消费服务进行消费。这一过程是异步的,有效地实现了消息消费与消息拉取的解耦。


RocketMQ支持两种消费模式:并发消费和顺序消费。本文重点探讨并发消费模式,其核心处理逻辑由ConsumeMessageConcurrentlyService服务负责。


ConsumeMessageConcurrentlyService.submitConsumeRequest方法的主要功能是创建一个消费请求ConsumeRequest,并将此请求提交给线程池进行处理。若单次消费的消息数量超过了预设的最大批次大小(默认值为1),则会将消息分批提交给线程池处理。


具体实现如下:


public void submitConsumeRequest(final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {
final int cOnsumeBatchSize= this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total List msgThis = new ArrayList<>(consumeBatchSize);
for (int i = 0; i if (total msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

ConsumeRequest.run方法负责实际的消息消费逻辑,主要包括以下几个步骤:



  1. 验证消息队列的有效性。如果队列已被标记为无效,则停止消费,并记录日志。

  2. 调用用户定义的消费监听器MessageListenerConcurrently来处理消息。

  3. 处理消费结果,根据不同的消费状态(如成功、失败或需重试)采取相应的措施。


消费结果的处理逻辑如下:


public void processConsumeResult(final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) {
case CONSUME_SUCCESS:
// 更新消费成功的统计信息
break;
case RECONSUME_LATER:
// 更新消费失败的统计信息
break;
default:
break;
}
// 处理不同消息模型下的消费结果
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播模式下直接丢弃失败的消息
break;
case CLUSTERING:
// 集群模式下将失败的消息重新发送回Broker
break;
default:
break;
}
// 移除已处理的消息并更新消费偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

在集群模式下,如果消息消费失败,系统会尝试将消息重新发送回Broker,并增加重试次数。如果消息再次发送失败,则会延迟一段时间后重新提交消费请求。


推荐阅读
  • 本文章介绍了如何将阿拉伯数字形式的金额转换为中国传统的大写形式,适用于财务报告和正式文件中的金额表示。 ... [详细]
  • 在上一期文章中,我们探讨了FastDev4Android项目中PullToRefreshListView组件的使用方法。本期将继续探讨该框架中的另一个重要组件——ACache数据缓存器,详细介绍其工作原理及如何在项目中有效利用。 ... [详细]
  • JobScheduler5.0源码分析
    0.JobScheduler执行代码mJobScheduler(JobScheduler)getSystemService(Context.JOB_SCHEDULER_SERVICE); ... [详细]
  • 本文介绍了如何在Java中使用`JCheckBoxMenuItem.setMnemonic()`方法,并提供了多个实际应用的代码示例。 ... [详细]
  • 本文详细介绍了Java中`org.sakaiproject.site.api.Site.addPage()`方法的功能和使用方法,并提供了多个实际项目中的代码示例。 ... [详细]
  • 本文介绍了NHibernate中通过定义接口和实现类来管理会话工厂的方法,包括接口的优势、模型文件夹的结构以及具体的代码示例。 ... [详细]
  • Linux双网卡绑定技术详解与实践
    本文详细介绍了如何在Linux系统中实现双网卡绑定,即将两块物理网卡合并为一个逻辑网卡,以提高网络性能和可靠性。文中不仅涵盖了基本的概念,还提供了具体的配置步骤和测试方法。 ... [详细]
  • VSCode中使用Clang-Format进行C/C++代码格式化配置
    本文介绍了如何在VSCode中配置Clang-Format以实现C/C++代码的自动格式化,包括安装必要的扩展、配置文件的创建以及常用设置的解释。建议阅读官方文档以获取更多详细信息。 ... [详细]
  • 本文探讨了Java异常处理的本质,提出了设计模式以优化异常处理,并分析了在AOP模型中异常处理的应用。文章强调了正确使用Java异常对于提升代码质量和维护性的关键作用。 ... [详细]
  • 本文通过对OkHttp源码的详细解读,旨在帮助读者理解其核心执行流程,特别是同步与异步请求的处理方式。文中不仅涵盖了基本的使用示例,还深入探讨了OkHttp的核心功能——拦截器链的工作原理。 ... [详细]
  • 使用URLHttpConnection获取并展示图片至ImageView的方法
    本文介绍如何通过URLHttpConnection方式从网络加载图片,并将其显示在Android应用的ImageView组件上。包括布局文件和Java代码的具体实现。 ... [详细]
  • 深入解析 Android 中的 ActivityGroup 实现
    本文详细探讨了如何在 Android 应用中使用 ActivityGroup 来实现类似微博客户端主界面的效果,并分析了 TabActivity 的局限性,推荐使用更为灵活的 ActivityGroup 方案。 ... [详细]
  • 本文讨论了在使用表单上传文件时遇到的值为空问题,并提供了几种有效的解决方案。 ... [详细]
  • 本文详细介绍了MySQL表分区的概念、类型及其在实际应用中的实施方法,特别是针对Zabbix数据库的优化策略。 ... [详细]
  • 本文介绍如何使用Java实现AC自动机(Aho-Corasick算法),以实现高效的多模式字符串匹配。文章涵盖了Trie树和KMP算法的基础知识,并提供了一个详细的代码示例,包括构建Trie树、设置失败指针以及执行搜索的过程。 ... [详细]
author-avatar
Th川_546
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有