热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

rocketmq消费负载均衡--push消费详解

这篇文章主要介绍了rocketmq消费负载均衡--push消费详解,本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点。,需要的朋友可以参考下

前言

本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点。本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。

介绍之前首先抛出几个问题:

1. 要做负载均衡,首先要解决的一个问题是什么?

2. 负载均衡是Client端处理还是Broker端处理?

个人理解:

1. 要做负载均衡,首先要做的就是信号收集。

所谓信号收集,就是得知道每一个consumerGroup有哪些consumer,对应的topic是谁。信号收集分为Client端信号收集与Broker端信号收集两个部分。

2. 负载均衡放在Client端处理。

具体做法是:消费者客户端在启动时完善rebalanceImpl实例,同时拷贝订阅信息存放rebalanceImpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有Broker,注册RegisterConsumer,等待上述过程准备好之后在Client端不断执行的负载均衡服务线程从Broker端获取一份全局信息(该consumerGroup下所有的消费Client),然后分配这些全局信息,获取当前客户端分配到的消费队列。

本文具体的内容:

I. copySubscription

Client端信号收集,拷贝订阅信息。

在DefaultMQPushConsumerImpl.start()时,会将消费者的topic订阅关系设置到rebalanceImpl的SubscriptionInner的map中用于负载:

private void copySubscription() throws MQClientException {
try {
//注:一个consumer对象可以订阅多个topic
Map sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptiOnData=
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptiOnData=
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

FilterAPI.buildSubscriptionData接口将订阅关系转换为SubscriptionData 数据,其中subString包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。

II. 完善rebalanceImpl实例

Client继续收集信息:

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

本文以DefaultMQPushConsumerImpl为例,因此this对象类型为DefaultMQPushConsumerImp。

III. this.rebalanceService.start()

开启负载均衡服务。this.rebalanceService是一个RebalanceService实例对象,它继承与ServiceThread,是一个线程类。 this.rebalanceService.start()执行时,也即执行RebalanceService线程体:

@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}

IV. this.mqClientFactory.doRebalance

客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:

public void doRebalance() {
for (String group : this.consumerTable.keySet()) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}

V. MQConsumerInner.doRebalance

由于本文以DefaultMQPushConsumerImpl消费过程为例,即DefaultMQPushConsumerImpl.doRebalance:

@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance();
}
}

步骤II 中完善了rebalanceImpl实例,为调用rebalanceImpl.doRebalance()提供了初始数据。

rebalanceImpl.doRebalance()过程如下:

public void doRebalance() {
     // 前文copySubscription中初始化了SubscriptionInner
Map subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}

VI. rebalanceByTopic -- 核心步骤之一

rebalanceByTopic方法中根据消费者的消费类型为BROADCASTING或CLUSTERING做不同的逻辑处理。CLUSTERING逻辑包括BROADCASTING逻辑,本部分只介绍集群消费负载均衡的逻辑。

集群消费负载均衡逻辑主要代码如下(省略了log等代码):

//1.从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
Set mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 从broker端获取消费该消费组的所有客户端clientId
List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
List mqAll = new ArrayList();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);

     // 3.创建DefaultMQPushConsumer对象时默认设置为AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List allocateResult = null;
try {
         // 4.调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
allocateResult = strategy.allocate(
this.consumerGroup, 
this.mQClientFactory.getClientId(), 
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
    // 5. 将分配得到的allocateResult 中的队列放入allocateResultSet 集合
Set allocateResultSet = new HashSet();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
、
     //6. 更新updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}

注:BROADCASTING逻辑只包含上述的1、6。

集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:

第1点:从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列

第2点: 从broker端获取消费该消费组的所有客户端clientId

首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端ChannelInfoTable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:

第4点:调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列

注:上图中cId1、cId2、...、cIdN通过 getConsumerIdListByGroup 获取,它们在这个ConsumerGroup下所有在线客户端列表中。

当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • 深入理解一致性哈希算法及其应用
    本文详细介绍了分布式系统中的一致性哈希算法,探讨其原理、优势及应用场景,帮助读者全面掌握这一关键技术。 ... [详细]
  • 在项目中使用 Redis 时,了解其不同架构模式(如单节点、主从复制、哨兵模式和集群)对于确保系统的高可用性和扩展性至关重要。本文将详细探讨这些模式的特点和应用场景。 ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • 本文作为SpringCloud Alibaba系列教程的第一部分,主要介绍如何搭建SpringCloud Alibaba的开发环境,帮助初学者快速入门。SpringCloud Alibaba是由阿里巴巴团队开源的一套微服务工具集,旨在简化分布式系统的构建过程。 ... [详细]
  • 堆是一种常见的数据结构,广泛应用于计算机科学领域。它通常表示为一棵完全二叉树,并可通过数组实现。堆的主要特性是每个节点的值与其父节点的值之间存在特定的关系,这使得堆在优先队列和排序算法中非常有用。 ... [详细]
  • 深入剖析 DEX 赛道:从 60 大头部项目看五大趋势
    本文通过分析 60 大头部去中心化交易平台(DEX),揭示了当前 DEX 赛道的五大发展趋势,包括市场集中度、跨链协议、AMM+NFT 结合、新公链崛起以及稳定币和衍生品交易的增长潜力。 ... [详细]
  • 探索电路与系统的起源与发展
    本文回顾了电路与系统的发展历程,从电的早期发现到现代电子器件的应用。文章不仅涵盖了基础理论和关键发明,还探讨了这一学科对计算机、人工智能及物联网等领域的深远影响。 ... [详细]
  • Linux 学习路径与核心框架
    本文提供了一套系统化的 Linux 学习路径,旨在帮助初学者和中级用户构建全面的知识体系。通过逐步深入的学习方法,掌握从基础命令到高级系统管理的技能。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 2018年3月31日,CSDN、火星财经联合中关村区块链产业联盟等机构举办的2018区块链技术及应用峰会(BTA)核心分会场圆满举行。多位业内顶尖专家深入探讨了区块链的核心技术原理及其在实际业务中的应用。 ... [详细]
  • 开发笔记:9.八大排序
    开发笔记:9.八大排序 ... [详细]
  • 在使用 Flutter 进行开发时,可能会遇到热更新功能无法正常工作的问题。本文将探讨一种常见的错误:无法连接到 Dart 观察站,并提供详细的解决方法。 ... [详细]
  • 本书系统介绍了概率论的基础概念,包括样本空间、事件及其概率定义,条件概率和独立性,并深入探讨了贝叶斯公式。随后,书中详细讲解了多种类型的随机变量,如离散型(伯努利、二项、几何、泊松)和连续型(均匀、指数、伽玛、正态)。此外,还涵盖了随机变量的期望值计算、联合分布特性、矩母函数以及极限定理等内容。特别地,本书对马尔可夫链、泊松过程及其扩展形式进行了详尽分析,为读者提供了丰富的理论依据和实际应用案例。 ... [详细]
  • Nginx 反向代理与负载均衡实验
    本实验旨在通过配置 Nginx 实现反向代理和负载均衡,确保从北京本地代理服务器访问上海的 Web 服务器时,能够依次显示红、黄、绿三种颜色页面以验证负载均衡效果。 ... [详细]
author-avatar
tannn2502886701
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有