热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ并发消息消费机制详解

本文详细介绍了RocketMQ中的消息并发消费机制,包括消息拉取后的处理流程、消费服务的调用以及消费任务的具体执行过程。

在RocketMQ中,当消息被成功拉取后,这些消息会被存储到ProcessQueue中,随后通过调用consumeMessageService.submitConsumeRequest方法来通知消费服务进行消费。这一过程是异步的,有效地实现了消息消费与消息拉取的解耦。


RocketMQ支持两种消费模式:并发消费和顺序消费。本文重点探讨并发消费模式,其核心处理逻辑由ConsumeMessageConcurrentlyService服务负责。


ConsumeMessageConcurrentlyService.submitConsumeRequest方法的主要功能是创建一个消费请求ConsumeRequest,并将此请求提交给线程池进行处理。若单次消费的消息数量超过了预设的最大批次大小(默认值为1),则会将消息分批提交给线程池处理。


具体实现如下:


public void submitConsumeRequest(final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {
final int cOnsumeBatchSize= this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total List msgThis = new ArrayList<>(consumeBatchSize);
for (int i = 0; i if (total msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

ConsumeRequest.run方法负责实际的消息消费逻辑,主要包括以下几个步骤:



  1. 验证消息队列的有效性。如果队列已被标记为无效,则停止消费,并记录日志。

  2. 调用用户定义的消费监听器MessageListenerConcurrently来处理消息。

  3. 处理消费结果,根据不同的消费状态(如成功、失败或需重试)采取相应的措施。


消费结果的处理逻辑如下:


public void processConsumeResult(final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) {
case CONSUME_SUCCESS:
// 更新消费成功的统计信息
break;
case RECONSUME_LATER:
// 更新消费失败的统计信息
break;
default:
break;
}
// 处理不同消息模型下的消费结果
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播模式下直接丢弃失败的消息
break;
case CLUSTERING:
// 集群模式下将失败的消息重新发送回Broker
break;
default:
break;
}
// 移除已处理的消息并更新消费偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

在集群模式下,如果消息消费失败,系统会尝试将消息重新发送回Broker,并增加重试次数。如果消息再次发送失败,则会延迟一段时间后重新提交消费请求。


推荐阅读
  • 本文深入探讨了UNIX/Linux系统中的进程间通信(IPC)机制,包括消息传递、同步和共享内存等。详细介绍了管道(Pipe)、有名管道(FIFO)、Posix和System V消息队列、互斥锁与条件变量、读写锁、信号量以及共享内存的使用方法和应用场景。 ... [详细]
  • 深入理解OAuth认证机制
    本文介绍了OAuth认证协议的核心概念及其工作原理。OAuth是一种开放标准,旨在为第三方应用提供安全的用户资源访问授权,同时确保用户的账户信息(如用户名和密码)不会暴露给第三方。 ... [详细]
  • 扫描线三巨头 hdu1928hdu 1255  hdu 1542 [POJ 1151]
    学习链接:http:blog.csdn.netlwt36articledetails48908031学习扫描线主要学习的是一种扫描的思想,后期可以求解很 ... [详细]
  • 深入探讨CPU虚拟化与KVM内存管理
    本文详细介绍了现代服务器架构中的CPU虚拟化技术,包括SMP、NUMA和MPP三种多处理器结构,并深入探讨了KVM的内存虚拟化机制。通过对比不同架构的特点和应用场景,帮助读者理解如何选择最适合的架构以优化性能。 ... [详细]
  • 最近团队在部署DLP,作为一个技术人员对于黑盒看不到的地方还是充满了好奇心。多次咨询乙方人员DLP的算法原理是什么,他们都以商业秘密为由避而不谈,不得已只能自己查资料学习,于是有了下面的浅见。身为甲方,虽然不需要开发DLP产品,但是也有必要弄明白DLP基本的原理。俗话说工欲善其事必先利其器,只有在懂这个工具的原理之后才能更加灵活地使用这个工具,即使出现意外情况也能快速排错,越接近底层,越接近真相。根据DLP的实际用途,本文将DLP检测分为2部分,泄露关键字检测和近似重复文档检测。 ... [详细]
  • 本题探讨如何通过最大流算法解决农场排水系统的设计问题。题目要求计算从水源点到汇合点的最大水流速率,使用经典的EK(Edmonds-Karp)和Dinic算法进行求解。 ... [详细]
  • 本文探讨了 Spring Boot 应用程序在不同配置下支持的最大并发连接数,重点分析了内置服务器(如 Tomcat、Jetty 和 Undertow)的默认设置及其对性能的影响。 ... [详细]
  • 数据结构入门:栈的基本概念与操作
    本文详细介绍了栈这一重要的数据结构,包括其基本概念、顺序存储结构、栈的基本操作(如入栈、出栈、清空栈和销毁栈),以及如何利用栈实现二进制到十进制的转换。通过具体代码示例,帮助读者更好地理解和应用栈的相关知识。 ... [详细]
  • 本文探讨了如何通过预处理器开关选择不同的类实现,并解决在特定情况下遇到的链接器错误。 ... [详细]
  • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
    本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
  • 深入解析 RocketMQ 的架构与应用
    本文详细介绍了 RocketMQ 的核心特性、系统架构、部署模式以及如何编写生产者和消费者的代码,通过具体案例探讨了其在实际项目中的应用。 ... [详细]
  • MainActivityimportandroid.app.Activity;importandroid.os.Bundle;importandroid.os.Handler;im ... [详细]
  • 本文探讨了随着并发需求的增长,MySQL数据库架构如何从简单的单一实例发展到复杂的分布式系统,以及每一步演进背后的原理和技术解决方案。 ... [详细]
  • 收割机|篇幅_国内最牛逼的笔记,不接受反驳!!
    收割机|篇幅_国内最牛逼的笔记,不接受反驳!! ... [详细]
  • 本文探讨了在C#服务中捕获控制台输出的有效方法,特别是在远程系统部署的应用场景下。文中不仅提供了基础的解决方案,还深入讨论了最佳实践,如使用日志库和事件日志等。 ... [详细]
author-avatar
Th川_546
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有