NameServer 是RocketMQ 消息队列的状态服务器(服务发现功能),集群中的各个服务都需要通过 NameServer 来理解集群中各个服务的状态。相当于 SpringCloud 中的 Eureka 的功能。
NameServer 中维护着 Producer 集群、Broker 集群、 Consumer 集群的服务状态。通过定时发送心跳数据包进行维护升级各个服务的状态。
当有新的Producer 加入集群时,通过上报自身的服务信息,及获取各个 Broker Master的信息(Broker 地址、Topic、Queue 等信息),这样即可以决定把对应的Topic消息存储到那个Broker、哪个Queue 上。Consumer 同理。
NameServer 可以部署多个,多个NameServer互相独立,不会交换消息。Producer、Broker、Consumer 启动的时候都需要指定多个 NameServer,各个服务的信息会同时注册到多个 NameServer 上,从而能到达高可用。
可以看出 NameServer 中的类比较少,8个类。分析起来也比较轻松。
org.apache.rocketmq.namesrv.NamesrvStartup 是 NameServer 的启动类。
通过 createNamesrvController 方法创立 NamesrvController 。
NameServer 启动时首先判断能否传入了命令行参数。
命令行参数有两个,-p 和 -c
-c 可以指定 NameServer 的配置文件,假如不指定,则使用默认值。
-p 打印 NameServer 的配置参数信息。打印完参数后退出进程。
下面是打印 NameServer 默认的配置参数信息。
1、调用NamesrvController.initialize() 初始化 NamesrvController,而后调用 NamesrvController.start() 方法来开启 NameServer 服务。
2、注册 ShutdownHookThread 服务。在 JVM 退出之前,调用 ShutdownHookThread 来进行关闭服务,释放资源。
注意:使用 kill -9 强制杀进程是不会执行 ShutdownHook 的。
public boolean initialize() { //从 /namesrv/kvConfig.json 中加载 NameServer 的配置 this.kvConfigManager.load(); //创立 Netty Server this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 创立 Netty Server 执行的线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册 NameServer 服务接受请求的解决类 this.registerProcessor(); //定时清除超时的Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定时打印 NameServer 的配置信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); ...}
HashMap > cOnfigTable= new HashMap>();
变量中。kvConfig.json 文件的默认路径为:private String kvCOnfigPath= System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
2、创立并初始化 NettyRemotingServer 。
NettyRomotingServer 是 NameServer 对外提供服务功能的。
3、创立 Netty Server 执行使用的线程池。
4、注册默认的解决类DefaultRequestProcessor,所有的请求均由该解决类的processRequest方法来解决。
5、创立一个定时清除超时的 Broker 定时任务。
每隔10秒检查一遍所有Broker的状态的定时任务,判断每一个Broker 最近两分钟能否升级过。假如没有升级则把该 Broker 的 channel 关闭(关闭该Broker
的长连接),并清理相关数据。
6、创立一个打印 NameServer 配置的定时任务。
每隔10分钟打印一次NameServer的配置参数。即KVConfigManager.configTable变量的内容。
注册接收请求的解决类。private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
默认注册的是 DefaultRequestProcessor 解决器。
假如设置了 NamesrvConfig.clusterTest = true,则会注册 ClusterTestRequestProcessor 解决器。
ClusterTestRequestProcessor继承DefaultRequestProcessor。
ClusterTestRequestProcessor仅重写了 getRouteInfoByTopic()方法。
判断假如获取不到 topicRouteData数据,则会去其它的NameServer 上查找该数据并返回。
通过 processRequest 方法来解决用户端发过来的请求。
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null;}
所有请求的操作说明如下:
requectcode | 说明 |
---|---|
PUT_KV_CONFIG | 向Namesrv追加KV配置 |
GET_KV_CONFIG | 从Namesrv获取KV配置 |
DELETE_KV_CONFIG | 从Namesrv获取KV配置 |
QUERY_DATA_VERSION | 获取版本信息 |
REGISTER_BROKER | 注册一个Broker,数据都是持久化的,假如存在则覆盖配置 |
UNREGISTER_BROKER | 卸载一个Broker,数据都是持久化的 |
GET_ROUTEINTO_BY_TOPIC | 根据Topic获取Broker Name、topic配置信息 |
GET_BROKER_CLUSTER_INFO | 获取注册到Name Server的所有Broker集群信息 |
WIPE_WRITE_PERM_OF_BROKER | 去掉BrokerName的写权限 |
GET_ALL_TOPIC_LIST_FROM_NAMESERVER | 从Name Server获取完整Topic列表 |
DELETE_TOPIC_IN_NAMESRV | 从Namesrv删除Topic配置 |
GET_KVLIST_BY_NAMESPACE | 通过NameSpace获取所有的KV List |
GET_TOPICS_BY_CLUSTER | 获取指定集群下的所有 topic |
GET_SYSTEM_TOPIC_LIST_FROM_NS | 获取所有系统内置 Topic 列表 |
GET_UNIT_TOPIC_LIST | 单元化相关 topic |
GET_HAS_UNIT_SUB_TOPIC_LIST | 获取含有单元化订阅组的 Topic 列表 |
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST | 获取含有单元化订阅组的非单元化 |
UPDATE_NAMESRV_CONFIG | 升级Name Server配置 |
根据 processRequest 方法分析源码,发现接收到的所有请求操作的数据都保存在 RouteInfoManager 类中,所有的操作都是对RouteInfoManager 类的操作。
public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap
private final HashMap
Map 中 key存储的是 Topic 的名称, value 存储的是 QueueData 的集合。
QueueData 的集合 size 等于 Topic 对应的 Broker Master 的个数。
QueueData 的数据结构如下:
public class QueueData implements Comparable
private final HashMap
Map 中 key 存储的是 Broker Name, value 存储的是 BrokerData 数据(Broker 的相关信息)。
BrokerData 的数据结构如下:
public class BrokerData implements Comparable
由于相同的名称的 BrokerName 可以多有个。一个 Master 和多个 Slave。所有使用 brokerAddrs 来存储相同 BrokerName 下所有的 Broker 信息(判断Master 和 Slave 的关系是通过 Master 和 Slave 名称能否相同,brokerId 为 0 的是Master, 大于0 的是 Slave)。
private final HashMap
Map 中 key存储的是 clusterName 的名称, value 存储的是 brokerName 的集合。
private final HashMap
Map 中 key 存储的是 brokerAddr 信息,value 存储的是 BrokerLiveInfo 信息,BrokerLiveInfo 中存储了 Broker 的实时状态。
class BrokerLiveInfo { // 最后升级时间 private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr;
上面详情的 NamesrvController.initialize() 中有一个schedule定时任务,每个10秒钟定时调用 scanNotActiveBroker() 方法进行扫描不活动的 Broker,并把不活动的 Broker 删除掉,就是判断的 这个 lastUpdateTimestamp 这个数据。
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
超过 2分钟没有升级这个值,就认为 Broker 不可用了。
private final HashMap
Map 中 key 存储的是 brokerAddr 信息,value 存储的是 Filter Server 信息。
Filter Server 是消息的过滤服务器,一个 Broker 可以对应多个 Filter Server。