作者:Th川_546 | 来源:互联网 | 2024-12-01 19:57
在RocketMQ中,当消息被成功拉取后,这些消息会被存储到ProcessQueue
中,随后通过调用consumeMessageService.submitConsumeRequest
方法来通知消费服务进行消费。这一过程是异步的,有效地实现了消息消费与消息拉取的解耦。
RocketMQ支持两种消费模式:并发消费和顺序消费。本文重点探讨并发消费模式,其核心处理逻辑由ConsumeMessageConcurrentlyService
服务负责。
ConsumeMessageConcurrentlyService.submitConsumeRequest
方法的主要功能是创建一个消费请求ConsumeRequest
,并将此请求提交给线程池进行处理。若单次消费的消息数量超过了预设的最大批次大小(默认值为1),则会将消息分批提交给线程池处理。
具体实现如下:
public void submitConsumeRequest(final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {
final int cOnsumeBatchSize= this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total List msgThis = new ArrayList<>(consumeBatchSize);
for (int i = 0; i if (total msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest cOnsumeRequest= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
ConsumeRequest.run
方法负责实际的消息消费逻辑,主要包括以下几个步骤:
- 验证消息队列的有效性。如果队列已被标记为无效,则停止消费,并记录日志。
- 调用用户定义的消费监听器
MessageListenerConcurrently
来处理消息。
- 处理消费结果,根据不同的消费状态(如成功、失败或需重试)采取相应的措施。
消费结果的处理逻辑如下:
public void processConsumeResult(final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) {
case CONSUME_SUCCESS:
// 更新消费成功的统计信息
break;
case RECONSUME_LATER:
// 更新消费失败的统计信息
break;
default:
break;
}
// 处理不同消息模型下的消费结果
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播模式下直接丢弃失败的消息
break;
case CLUSTERING:
// 集群模式下将失败的消息重新发送回Broker
break;
default:
break;
}
// 移除已处理的消息并更新消费偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在集群模式下,如果消息消费失败,系统会尝试将消息重新发送回Broker,并增加重试次数。如果消息再次发送失败,则会延迟一段时间后重新提交消费请求。