热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

源码分析RocketMQ之消息ACK机制(消费进度)

1、消息消费进度概述首先简要阐述一下消息消费进度:消费者订阅消息消费队列(MessageQueue),当生产者将消息负载发送到MessageQueu

1、消息消费进度概述

首先简要阐述一下消息消费进度:

消费者订阅消息消费队列(MessageQueue), 当生产者将消息负载发送到 MessageQueue 中时,消费订阅者开始消费消息,消息消费过程中,为了避免重复消费,需要一个地方存储消费进度(消费偏移量)。

消息模式主要分为集群模式、广播模式:


  • 集群模式:一条消息被集群中任何一个消费者消费。
  • 广播模式:每条消息都被每一个消费者消费。

广播模式,既然每条消息要被每一个消费者消费,则消费进度可以与消费者保存在一起,也就是本地保存,但由于集群模式下,一条消息只能被集群内的一个消费者消费,进度不能保存在消费端,只能集中保存在一个地方,比较合适的是在 Broker 端。

 


2、消息消费进度存储接口

接下来我们先分析一下消息消费进度接口:OffsetStore。

/*** Offset store interface*/
public interface OffsetStore {/*** Load** @throws MQClientException*/void load() throws MQClientException;/*** Update the offset,store it in memory** @param mq* @param offset* @param increaseOnly*/void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);/*** Get offset from local storage** @param mq* @param type* @return The fetched offset*/long readOffset(final MessageQueue mq, final ReadOffsetType type);/*** Persist all offsets,may be in local storage or remote name server** @param mqs*/void persistAll(final Set mqs);/*** Persist the offset,may be in local storage or remote name server** @param mq*/void persist(final MessageQueue mq);/*** Remove offset** @param mq*/void removeOffset(MessageQueue mq);/*** @param topic* @return The cloned offset table of given topic*/Map cloneOffsetTable(String topic);/*** @param mq* @param offset* @param isOneway*/void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException;
}

入口代码:DefaultMQPushConsumerImpl#start()。

 


推荐阅读
author-avatar
霸气的萱---_299
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有