热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMQ中clientId的使用陷阱

试验环境部署架构部署了两套RocketMQ集群,在DefaultCluster集群上创建Topic——dw_test_01,并在DefaultClust

 

试验环境部署架构

部署了两套 RocketMQ 集群,在 DefaultCluster 集群上创建 Topic——dw_test_01,并在 DefaultClusterb 上创建 Topic——dw_test_02,现在的需求是 order-service-app 要向 dw_test_01、dw_test_02 上发送消息。给出的示例代码如下:

public static void main(String[] args) throws Exception{// 创建第一个生产者DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group1");producer.setNamesrvAddr("192.168.3.10:9876");producer.start();// 创建第二个生产者DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");producer2.setNamesrvAddr("192.168.3.19:9876");producer2.start();try {// 向第一个 RocketMQ 集群发送消息SendResult result1 = producer.send( new Message("dw_test_01" , "hello 192.168.3.10 nameserver".getBytes()));System.out.printf("%s%n", result1);} catch (Throwable e) {System.out.println("-----first------------");e.printStackTrace();System.out.println("-----first------------");}try {// 向第一个 RocketMQ 集群发送消息SendResult result2 = producer2.send( new Message("dw_test_02" , "hello 192.168.3.19 nameserver".getBytes()));System.out.printf("%s%n", result2);} catch (Throwable e) {System.out.println("-----secornd------------");e.printStackTrace();System.out.println("-----secornd------------");}//睡眠 10s,简单延迟该任务的结束Thread.sleep(10000);}

在向集群 2 发送消息时出现 Topic 不存在,但明明创建了 dw_test_02,而且如果单独向集群 2 的 dw_test_02 发送消息确能成功(服务端设置为不可自动创建topic).为什么会出现这种情况?

走读源码来查询具体的原因

DefaultMQProducer 

public class DefaultMQProducer extends ClientConfig implements MQProducer {//省略其他属性|代码protected final transient DefaultMQProducerImpl defaultMQProducerImpl;public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);//if client open the message trace featureif (enableMsgTrace) {try {AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);dispatcher.setHostProducer(this.defaultMQProducerImpl);traceDispatcher = dispatcher;this.defaultMQProducerImpl.registerSendMessageHook(new SendMessageTraceHookImpl(traceDispatcher));} catch (Throwable e) {log.error("system mqtrace hook init failed ,maybe can't send msg trace data");}}}@Overridepublic void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}
}

DefaultMQProducer 的核心操作,其实是转交给属性defaultMQProducerImpl来处理的。start()也直接调用this.defaultMQProducerImpl.start();

DefaultMQProducerImpl

public class DefaultMQProducerImpl implements MQProducerInner {private MQClientInstance mQClientFactory;public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {RequestFutureTable.scanExpiredRequest();} catch (Throwable e) {log.error("scan RequestFutureTable exception", e);}}}, 1000 * 3, 1000);}private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicatiOnMode== CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout

start()方法初始化MQClientInstance属性,sendDefaultImpl方法中的this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);则是向kernel发送消息。下面我们重点关注下MQClientInstance属性是如何实例化的。

MQClientManager

public class MQClientManager {private final static InternalLogger log = ClientLogger.getLog();private static MQClientManager instance = new MQClientManager();private AtomicInteger factoryIndexGenerator = new AtomicInteger();private ConcurrentMap factoryTable =new ConcurrentHashMap();private MQClientManager() {}public static MQClientManager getInstance() {return instance;}public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {return getOrCreateMQClientInstance(clientConfig, null);}public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}}

factoryTable这个map中维护了系统中所有的MQClientInstance。key为clientId,value为MQClientInstance。

到此可以推测,案例中的DefaultMQProducer本应该对应不同的MQClientInstance,但因为clientId值一样,所以只初始化了一个MQClientInstance。两个DefaultMQProducer底层使用的是一个MQClientInstance(配置都用的是第一套),所以导致生产消息失败。我们来看下clientId是如何生成的。

ClientConfig

public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}return sb.toString();}

clientId 的生成策略如下:


  • clientIp:客户端的 IP 地址。
  • instanceName:实例名称,默认值为 DEFAULT,但在真正 clientConfig 的 getInstanceName 方法时如果实例名称为 DEFAULT,会自动将其替换为进程的 PID。
  • unitName:单元名称,如果不为空,则会追加到 clientId 中。

 

解决方案

结合 clientId 三个组成部分,我不建议修改 instanceName,让其保持默认值 DEFAULT,这样在真正的运行过程中会自动变更为进程的 pid,这样能解决同一套代码在同一台机器上部署多个进程,这样 clientId 并不会重复,故我建议大家修改 unitName,可以考虑将其修改为集群的名称,修改后的代码如下所示:

public static void main(String[] args) throws Exception{//省略代码DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");producer2.setNamesrvAddr("192.168.3.19:9876");producer2.setUnitName("DefaultClusterb");producer2.start();//省略代码
}

 


推荐阅读
author-avatar
手机用户2502887947_784
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有