作者:忆丨残年_686 | 来源:互联网 | 2023-09-18 09:39
消息生产者启动之后,我们就可以按照需要发送消息了,消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。查找主题的路由信息try
消息生产者启动之后,我们就可以按照需要发送消息了,消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。
查找主题的路由信息
tryToFindTopicPublishlnfo 是查找主题的路由信息的方法。如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存 或没有包含消息队列, 则向 NameServer 查询该 topic 的路由信息。如果最终未找到路由信息,则抛出异常: 无法找到主题相关路由信息异常。
Private TopicPublishInfo tryToFindTopicPublishInfo(final Stringtopic){
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if(null == topicPublishInfo || !topicPublishInfo.ok()){
this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo=this.topicPublishInfoTable.get(topic);
}
if(topicPublishInfo.isHaveTopicRouterInfo()||topicPublishInfo.ok()){
return topicPublishInfo;
}else{
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
returntopicPublishInfo;
}
}
第一次发送消息时,本地没有缓存 topic 的路由信息,查询 NameServer 尝试获取,如
果路由信息未找到,再次尝试用默认主题 DefaultMQProducerlmpl#createTopicKey 去查询,如果 BrokerConfig#autoCreateTopicEnable为true时,NameServer 将返回路由信息,如果autoCreateTopicEnab为false 将抛出无法找到 topic 路由异常。
消息生产者更新和维护路由缓存
消息生产者更新和维护路由缓存的操作在MQClientlnstance的updateTopicRoutelnfoFromNameServer方法里:
1、如果 isDdfault为true ,则使用 默认主题去查询,如果查询到路由信息,则 替换路由信息中读写队列个数为消息生产者默认的队列个数(defaultTopicQueueNums );如果 isDefault 为false ,则使用参数 topic 去查询;如果未 查询到路由信息,则返回 false ,表示 路由信息未变化。
TopicRouteDatatopicRouteData;
if(isDefault&&defaultMQProducer!=null){
topicRouteData=this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000*3);
if(topicRouteData!=null){
for(QueueDatadata:topicRouteData.getQueueDatas()){
intqueueNums=Math.min(defaultMQProducer.getDefaultTopicQueueNums(),data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}else{
topicRouteData=this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000*3);
}
2、如果路由信息找到,与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变, 如果未发生变化,则直接返回 false。
TopicRouteDataold=this.topicRouteTable.get(topic);
booleanchanged=topicRouteDataIsChange(old,topicRouteData);
if(!changed){
changed=this.isNeedUpdateTopicRouteInfo(topic);
}else{
log.info("thetopic[{}]routeinfochanged,old[{}],new[{}]",topic,old,topicRouteData);
}
3、更新 MQClientlnstance Broker 地址缓存表
4、根据 topicRouteData 中的 List 转换成topicPublishInfo的 List 列表,其具体实现在 topicRouteData2TopicPublishInfo, 然后会更新该 MQClientlnstance所管辖的所有消息发送关于 topic 的路由信息。
循环遍历路由 信息的 QueueData 信息,如果队列 没有写权 限,则继续遍历下 一个 QueueData ;根据 brokerName 找到 brokerData 信息 ,找不 到或没有找到 Master 节点,则 遍历下一个 QueueData ;根据写队列个数,根据 topic+序号 创建MessageQueue ,填充 topicPublishlnfo 的List.
至此,消息发送的路由查找就完成了。
J_Howie
发布了21 篇原创文章 · 获赞 11 · 访问量 1万+
私信
关注