热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ并发消息消费机制详解

本文详细介绍了RocketMQ中的消息并发消费机制,包括消息拉取后的处理流程、消费服务的调用以及消费任务的具体执行过程。

在RocketMQ中,当消息被成功拉取后,这些消息会被存储到ProcessQueue中,随后通过调用consumeMessageService.submitConsumeRequest方法来通知消费服务进行消费。这一过程是异步的,有效地实现了消息消费与消息拉取的解耦。


RocketMQ支持两种消费模式:并发消费和顺序消费。本文重点探讨并发消费模式,其核心处理逻辑由ConsumeMessageConcurrentlyService服务负责。


ConsumeMessageConcurrentlyService.submitConsumeRequest方法的主要功能是创建一个消费请求ConsumeRequest,并将此请求提交给线程池进行处理。若单次消费的消息数量超过了预设的最大批次大小(默认值为1),则会将消息分批提交给线程池处理。


具体实现如下:


public void submitConsumeRequest(final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {
final int cOnsumeBatchSize= this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total List msgThis = new ArrayList<>(consumeBatchSize);
for (int i = 0; i if (total msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

ConsumeRequest.run方法负责实际的消息消费逻辑,主要包括以下几个步骤:



  1. 验证消息队列的有效性。如果队列已被标记为无效,则停止消费,并记录日志。

  2. 调用用户定义的消费监听器MessageListenerConcurrently来处理消息。

  3. 处理消费结果,根据不同的消费状态(如成功、失败或需重试)采取相应的措施。


消费结果的处理逻辑如下:


public void processConsumeResult(final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) {
case CONSUME_SUCCESS:
// 更新消费成功的统计信息
break;
case RECONSUME_LATER:
// 更新消费失败的统计信息
break;
default:
break;
}
// 处理不同消息模型下的消费结果
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播模式下直接丢弃失败的消息
break;
case CLUSTERING:
// 集群模式下将失败的消息重新发送回Broker
break;
default:
break;
}
// 移除已处理的消息并更新消费偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

在集群模式下,如果消息消费失败,系统会尝试将消息重新发送回Broker,并增加重试次数。如果消息再次发送失败,则会延迟一段时间后重新提交消费请求。


推荐阅读
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 数据库内核开发入门 | 搭建研发环境的初步指南
    本课程将带你从零开始,逐步掌握数据库内核开发的基础知识和实践技能,重点介绍如何搭建OceanBase的开发环境。 ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 本文详细介绍如何使用Samba软件配置CIFS文件共享服务,涵盖安装、配置、权限管理及多用户挂载等关键步骤。通过具体示例和命令行操作,帮助读者快速搭建并优化Samba服务器。 ... [详细]
  • 在维护公司项目时,发现按下手机的某个物理按键后会激活相应的服务,并在屏幕上模拟点击特定坐标点。本文详细介绍了如何使用ADB Shell Input命令来模拟各种输入事件,包括滑动、按键和点击等。 ... [详细]
  • andr ... [详细]
  • 本文由瀚高PG实验室撰写,详细介绍了如何在PostgreSQL中创建、管理和删除模式。文章涵盖了创建模式的基本命令、public模式的特性、权限设置以及通过角色对象简化操作的方法。 ... [详细]
  • MySQL索引详解与优化
    本文深入探讨了MySQL中的索引机制,包括索引的基本概念、优势与劣势、分类及其实现原理,并详细介绍了索引的使用场景和优化技巧。通过具体示例,帮助读者更好地理解和应用索引以提升数据库性能。 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 解决JAX-WS动态客户端工厂弃用问题并迁移到XFire
    在处理Java项目中的JAR包冲突时,我们遇到了JaxWsDynamicClientFactory被弃用的问题,并成功将其迁移到org.codehaus.xfire.client。本文详细介绍了这一过程及解决方案。 ... [详细]
  • 本文详细介绍了Linux系统中init进程的作用及其启动过程,解释了运行级别的概念,并提供了调整服务启动顺序的具体步骤和实例。通过了解这些内容,用户可以更好地管理系统的启动流程和服务配置。 ... [详细]
author-avatar
Th川_546
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有