作者:手机用户2502887947_784 | 来源:互联网 | 2023-08-20 00:26
试验环境部署架构
部署了两套 RocketMQ 集群,在 DefaultCluster 集群上创建 Topic——dw_test_01,并在 DefaultClusterb 上创建 Topic——dw_test_02,现在的需求是 order-service-app 要向 dw_test_01、dw_test_02 上发送消息。给出的示例代码如下:
public static void main(String[] args) throws Exception{// 创建第一个生产者DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group1");producer.setNamesrvAddr("192.168.3.10:9876");producer.start();// 创建第二个生产者DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");producer2.setNamesrvAddr("192.168.3.19:9876");producer2.start();try {// 向第一个 RocketMQ 集群发送消息SendResult result1 = producer.send( new Message("dw_test_01" , "hello 192.168.3.10 nameserver".getBytes()));System.out.printf("%s%n", result1);} catch (Throwable e) {System.out.println("-----first------------");e.printStackTrace();System.out.println("-----first------------");}try {// 向第一个 RocketMQ 集群发送消息SendResult result2 = producer2.send( new Message("dw_test_02" , "hello 192.168.3.19 nameserver".getBytes()));System.out.printf("%s%n", result2);} catch (Throwable e) {System.out.println("-----secornd------------");e.printStackTrace();System.out.println("-----secornd------------");}//睡眠 10s,简单延迟该任务的结束Thread.sleep(10000);}
在向集群 2 发送消息时出现 Topic 不存在,但明明创建了 dw_test_02,而且如果单独向集群 2 的 dw_test_02 发送消息确能成功(服务端设置为不可自动创建topic).为什么会出现这种情况?
走读源码来查询具体的原因
DefaultMQProducer
public class DefaultMQProducer extends ClientConfig implements MQProducer {//省略其他属性|代码protected final transient DefaultMQProducerImpl defaultMQProducerImpl;public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);//if client open the message trace featureif (enableMsgTrace) {try {AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);dispatcher.setHostProducer(this.defaultMQProducerImpl);traceDispatcher = dispatcher;this.defaultMQProducerImpl.registerSendMessageHook(new SendMessageTraceHookImpl(traceDispatcher));} catch (Throwable e) {log.error("system mqtrace hook init failed ,maybe can't send msg trace data");}}}@Overridepublic void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}
}
DefaultMQProducer 的核心操作,其实是转交给属性defaultMQProducerImpl来处理的。start()也直接调用this.defaultMQProducerImpl.start();
DefaultMQProducerImpl
public class DefaultMQProducerImpl implements MQProducerInner {private MQClientInstance mQClientFactory;public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {RequestFutureTable.scanExpiredRequest();} catch (Throwable e) {log.error("scan RequestFutureTable exception", e);}}}, 1000 * 3, 1000);}private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicatiOnMode== CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout start()方法初始化MQClientInstance属性,sendDefaultImpl方法中的this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);则是向kernel发送消息。下面我们重点关注下MQClientInstance属性是如何实例化的。
MQClientManager
public class MQClientManager {private final static InternalLogger log = ClientLogger.getLog();private static MQClientManager instance = new MQClientManager();private AtomicInteger factoryIndexGenerator = new AtomicInteger();private ConcurrentMap factoryTable =new ConcurrentHashMap();private MQClientManager() {}public static MQClientManager getInstance() {return instance;}public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {return getOrCreateMQClientInstance(clientConfig, null);}public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}}
factoryTable这个map中维护了系统中所有的MQClientInstance。key为clientId,value为MQClientInstance。
到此可以推测,案例中的DefaultMQProducer本应该对应不同的MQClientInstance,但因为clientId值一样,所以只初始化了一个MQClientInstance。两个DefaultMQProducer底层使用的是一个MQClientInstance(配置都用的是第一套),所以导致生产消息失败。我们来看下clientId是如何生成的。
ClientConfig
public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}return sb.toString();}
clientId 的生成策略如下:
- clientIp:客户端的 IP 地址。
- instanceName:实例名称,默认值为 DEFAULT,但在真正 clientConfig 的 getInstanceName 方法时如果实例名称为 DEFAULT,会自动将其替换为进程的 PID。
- unitName:单元名称,如果不为空,则会追加到 clientId 中。
解决方案
结合 clientId 三个组成部分,我不建议修改 instanceName,让其保持默认值 DEFAULT,这样在真正的运行过程中会自动变更为进程的 pid,这样能解决同一套代码在同一台机器上部署多个进程,这样 clientId 并不会重复,故我建议大家修改 unitName,可以考虑将其修改为集群的名称,修改后的代码如下所示:
public static void main(String[] args) throws Exception{//省略代码DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");producer2.setNamesrvAddr("192.168.3.19:9876");producer2.setUnitName("DefaultClusterb");producer2.start();//省略代码
}