作者:akj | 来源:互联网 | 2023-07-02 20:53
原标题:RocketMQTemplate发送带tags的消息RocketMQTemplate是RocketMQ集成到Springcloud之后提供的个方便发送消息的模板类,它是基
原标题:RocketMQTemplate发送带tags的消息
RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口。在单独使用Rock文章来源地址46576.htmletMQ的时候,发送消息使用的Message是‘org.apache.rocketmq.common.message’包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.springframework.messaging
的Message,猛一看,没办法发送带tags的消息了,其实在RocketMQ集成的时候已经解决了这个问题。
在RocketMQTemplate发送消息时,调用的方法是:
public SendResult syncSendOrderly(String destination, Message> message, String hashKey, long timeout) {
if (Obj文章来源站点https://www.yii666.com/ects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
文章来源地址46576.html throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
//在这里对消息进行了转化,将Spring的message转化为rocketmq自己的message
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgIdwww.yii666.com:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
在上面的代码中,对消息进行了转化,将Spring的message转化为rocketmq自己的message,在RocketMQUtil.convertToRocketMessage方法中有个地方就是获取tags的:
String[] tempArr = destinawww.yii666.comtion.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
所以,在发送消息的时候,我们只要把tags使用":"添加到topic后面就可以了。例如:xxxx:tag1 || tag2 || tag3
来源于:RocketMQTemplate发送带tags的消息