热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ消息发送(二)、消息队列负载机制

消息生产者启动之后,我们就可以按照需要发送消息了,消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。查找主题的路由信息try



消息生产者启动之后,我们就可以按照需要发送消息了,消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。


查找主题的路由信息

tryToFindTopicPublishlnfo 是查找主题的路由信息的方法。如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存 或没有包含消息队列, 则向 NameServer 查询该 topic 的路由信息。如果最终未找到路由信息,则抛出异常: 无法找到主题相关路由信息异常。

Private TopicPublishInfo tryToFindTopicPublishInfo(final Stringtopic){
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if(null == topicPublishInfo || !topicPublishInfo.ok()){
this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo=this.topicPublishInfoTable.get(topic);
}

if(topicPublishInfo.isHaveTopicRouterInfo()||topicPublishInfo.ok()){
return topicPublishInfo;
}else{
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
returntopicPublishInfo;
}
}

第一次发送消息时,本地没有缓存 topic 的路由信息,查询 NameServer 尝试获取,如

果路由信息未找到,再次尝试用默认主题 DefaultMQProducerlmpl#createTopicKey 去查询,如果 BrokerConfig#autoCreateTopicEnable为true时,NameServer 将返回路由信息,如果autoCreateTopicEnab为false 将抛出无法找到 topic 路由异常。


消息生产者更新和维护路由缓存

消息生产者更新和维护路由缓存的操作在MQClientlnstance的updateTopicRoutelnfoFromNameServer方法里:

1、如果 isDdfault为true ,则使用 默认主题去查询,如果查询到路由信息,则 替换路由信息中读写队列个数为消息生产者默认的队列个数(defaultTopicQueueNums );如果 isDefault 为false ,则使用参数 topic 去查询;如果未 查询到路由信息,则返回 false ,表示 路由信息未变化。

TopicRouteDatatopicRouteData;
if(isDefault&&defaultMQProducer!=null){
topicRouteData=this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000*3);
if(topicRouteData!=null){
for(QueueDatadata:topicRouteData.getQueueDatas()){
intqueueNums=Math.min(defaultMQProducer.getDefaultTopicQueueNums(),data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}else{
topicRouteData=this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000*3);
}

2、如果路由信息找到,与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变, 如果未发生变化,则直接返回 false。

TopicRouteDataold=this.topicRouteTable.get(topic);
booleanchanged=topicRouteDataIsChange(old,topicRouteData);
if(!changed){
changed=this.isNeedUpdateTopicRouteInfo(topic);
}else{
log.info("thetopic[{}]routeinfochanged,old[{}],new[{}]",topic,old,topicRouteData);
}

3、更新 MQClientlnstance Broker 地址缓存表

4、根据 topicRouteData 中的 List 转换成topicPublishInfo的 List 列表,其具体实现在 topicRouteData2TopicPublishInfo, 然后会更新该 MQClientlnstance所管辖的所有消息发送关于 topic 的路由信息。

循环遍历路由 信息的 QueueData 信息,如果队列 没有写权 限,则继续遍历下 一个 QueueData ;根据 brokerName 找到 brokerData 信息 ,找不 到或没有找到 Master 节点,则 遍历下一个 QueueData ;根据写队列个数,根据 topic+序号 创建MessageQueue ,填充 topicPublishlnfo 的List.

至此,消息发送的路由查找就完成了。






  • 点赞



  • 收藏



  • 分享




    • 文章举报






J_Howie
发布了21 篇原创文章 · 获赞 11 · 访问量 1万+
私信

关注

推荐阅读
  • iOS 小组件开发指南
    本文详细介绍了iOS小部件(Widget)的开发流程,从环境搭建、证书配置到业务逻辑实现,提供了一系列实用的技术指导与代码示例。 ... [详细]
  • 本文详细介绍如何在Spring Boot项目中集成和使用JPA,涵盖JPA的基本概念、Spring Data JPA的功能以及具体的操作步骤,帮助开发者快速掌握这一强大的持久化技术。 ... [详细]
  • 了解如何有效清除远程桌面连接中的缓存记录,对于提升服务器安全性至关重要。本文将指导您完成这一过程。 ... [详细]
  • 本文介绍了进程的基本概念及其在操作系统中的重要性,探讨了进程与程序的区别,以及如何通过多进程实现并发和并行。文章还详细讲解了Python中的multiprocessing模块,包括Process类的使用方法、进程间的同步与异步调用、阻塞与非阻塞操作,并通过实例演示了进程池的应用。 ... [详细]
  • Cadence SPB 16.5 安装指南与注意事项
    本文提供了详细的 Cadence SPB 16.5 安装步骤,包括环境配置、安装过程中的关键步骤以及常见问题的解决方案。适合初次安装或遇到问题的技术人员参考。 ... [详细]
  • 本文主要解决了在编译CM10.2时出现的关于Samsung Exynos 4 HDMI HAL库中SecHdmiV4L2Utils.cpp文件的编译错误。 ... [详细]
  • [编程题] LeetCode上的Dynamic Programming(动态规划)类型的题目
    继上次把backTracking的题目做了一下之后:backTracking,我把LeetCode的动态规划的题目又做了一下,还有几道比较难的Medium的题和Hard的题没做出来,后面会继续 ... [详细]
  • Python闭包深度解析与应用实例
    本文详细介绍了Python闭包的基本概念、必要条件及其实现方式,并通过具体示例说明闭包在提高代码复用性和维护性方面的作用。文章最后还探讨了闭包的内部机制及其在实际项目中的应用。 ... [详细]
  • 深入解析mt_allocator内存分配器(二):多线程与单线程场景下的实现
    本文详细介绍了mt_allocator内存分配器在多线程和单线程环境下的实现机制。该分配器以2的幂次方字节为单位分配内存,支持灵活的配置和高效的性能。文章分为内存池特性描述、内存池实现、单线程内存池实现、内存池策略类实现及多线程内存池实现等部分,深入探讨了内存池的初始化、内存分配与回收的具体实现。 ... [详细]
  • 深入解析Nacos服务自动注册机制
    本文将探讨Nacos服务自动注册的具体实现方法,特别是如何通过Spring事件机制完成服务注册。通过对Nacos源码的详细分析,帮助读者理解其背后的原理。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统上安装中文宋体字体的方法,包括操作系统的环境配置、字体管理工具的安装、字体文件的传输与缓存重建等步骤。 ... [详细]
  • 本文介绍了如何在React应用中实现延迟加载以提高性能,以及如何利用自定义Hook和高阶组件(HOC)来增强组件功能。通过这些技术,开发者可以构建更加高效和可维护的应用。 ... [详细]
  • 本文介绍如何通过Java代码调用阿里云短信服务API来实现短信验证码的发送功能,包括必要的依赖添加和关键代码示例。 ... [详细]
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
  • Python网络编程:深入探讨TCP粘包问题及解决方案
    本文详细探讨了TCP协议下的粘包现象及其产生的原因,并提供了通过自定义报头解决粘包问题的具体实现方案。同时,对比了TCP与UDP协议在数据传输上的不同特性。 ... [详细]
author-avatar
忆丨残年_686
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有