写在前面
通过上一个章节MQTT系列---Java端实现消息发布与订阅的介绍,我们已经基本构建出一个可以简单通信的MQTT生产和消费服务,并且具备基本的发布/订阅消息功能。那么从本章开始,我们将从源代码的角度,进一步分析一下开源中间件Eclipse.paho是如何作为MQTTClient进行工作的。在这里我们讨论的版本为Eclipse.paho的1.2.2,基于MQTT3.1.1协议
首先我们来看一下Eclipse.paho的代码结构
org.eclipse.paho.client.mqttv3:主要用于对外提供服务,即整个Eclipse Paho对外的窗口。
org.eclipse.paho.client.mqttv3.internal:提供了对mqttv3 中的接口的实现。
org.eclipse.paho.client.mqttv3.internal.nls: 国际化相关。点进去你会惊讶的发现messages_zh_CN.properties,没错,你没有看错,这货支持中文。不过学习源码时这个包不太重要,可以忽略
org.eclipse.paho.client.mqttv3.internal.security:MQTT支持SSL加密,这个包内实现了基于TLS协议的SSLSocket,这个配置起来还是有一些复杂度的,我会单独开一章详细讲。
org.eclipse.paho.client.mqttv3.internal.websocket:websocket相关实现
org.eclipse.paho.client.mqttv3.internal.wire:MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等
org.eclipse.paho.client.mqttv3.persist:发布信息持久化类。MQTT提供两种保持发布消息的方式,一种是将信息保持到文件中;一种是直接保持到内存中。
org.eclipse.paho.client.mqttv3.util:工具类。
org.eclipse.paho.client.mqttv3.logging:日志包
上面标红色的即为Eclipse paho的核心代码,所以只要把这些内容搞清楚,基本上它的核心内容和执行流程也就了解了。其实Eclipse paho还是算比较轻量级的,代码量也不算非常大。对于这种偏工具类的中间件,我个人的习惯是找到它核心的功能,顺着流程看。就Eclipse paho来说,它是对MQTTClient的实现,核心功能无非就是与服务端建立连接,发布、订阅消息。所以接下来的几个章节,我也会从这些角度来分析。对于代码的解释大部分都以注释的形式写在每行代码的前面,所以小伙伴们浏览时一定要仔细的看一看代码,避免有疏漏看不懂的地方。
PS:这里先把Eclipse Paho的核心类及其相关作用贴在前面,方便小伙伴们查阅
项目源码地址:https://github.com/eclipse/paho.mqtt.java
一、创建异步的客户端对象
MqttAsyncClient
通过上一章会了解到,当Eclipse paho作为MQTTClient生产端进行发送消息时,其主要是通过MqttPahoMessageHandler处理发送逻辑的,在与Spring-Intergration整合时,也会将MqttPahoMessageHandler的实例托管到Spring容器中。我们来简单看一下MqttPahoMessageHandler中和pulish()方法相关的的代码
public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
implements MqttCallback, ApplicationEventPublisherAware {
/**
* The default completion timeout in milliseconds.
*/
// 默认的发送完成等待超时时间
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
// 最长发送完成超时时间,默认30秒
private long completiOnTimeout= DEFAULT_COMPLETION_TIMEOUT;
@Override
protected void publish(String topic, Object mqttMessage, Message> message) {
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
try {
// 在这里创建连接并且发送消息
IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMessage) mqttMessage);
// 如果是非异步的,会阻塞等待一条消息发送完成(发送成功并且收到相应的回执,这个取决于qos)
// 如果是异步的,会通过事件回调的方式t通知发送完成,非阻塞
if (!this.async) {
token.waitForCompletion(this.completionTimeout); // NOSONAR (sync)
}
else if (this.asyncEvents && this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
}
}
catch (MqttException e) {
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);
}
}
}
这里小伙伴们可能会有一个疑问,这个publish()方法是在什么哪里被调用的呢?这个需要明确一下,我们之前提过Spring-Intergration类似一个消息驱动模型,他会把我们模块间的调用以一种消息通知的方式进行调用,注意看一下MqttPahoMessageHandler的继承关系
发现什么了嘛?没错,我们的消息(这里的消息不是我们生产的MQTT消息,而是Spring-Intergration消息模型中的消息)统一都会经过AbstractMessageHandler进行处理,路由到不同的出站适配器中,Eclipse.paho实现了AbstractMessageHandler对应的处理消息的方法,即在这里创建连接和执行发送操作。并最终调用publish()方法进行发送。所以上述代码中最核心的代码如下
IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMessage) mqttMessage);
即两步操作
检查连接
发送消息
下面我们分别介绍
检查连接
废话不说,直接看代码
private synchronized IMqttAsyncClient checkConnection() throws MqttException {
// 检查客户端对象是否是连接状态或client对象非空
// 是 -> 将客户端关闭,释放资源
if (this.client != null && !this.client.isConnected()) {
this.client.setCallback(null);
this.client.close();
this.client = null;
}
// 如果客户端对象为空,获取异步客户端对象,并用创建好的连接对象进行连接操作
if (this.client == null) {
try {
MqttConnectOptions cOnnectionOptions= this.clientFactory.getConnectionOptions();
Assert.state(this.getUrl() != null || connectionOptions.getServerURIs() != null,
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
// 通过客户端工厂调用MQTTAsyncClient的构造方法创建一个客户端实例
this.client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
// 客户端实例计数器+1
incrementClientInstance();
// 设置回调
this.client.setCallback(this);
// 通过阻塞线程建立连接,保证连接建立成功后再执行其他操作
this.client.connect(connectionOptions).waitForCompletion(this.completionTimeout);
logger.debug("Client connected");
}
catch (MqttException e) {
if (this.client != null) {
this.client.close();
this.client = null;
}
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
}
throw new MessagingException("Failed to connect", e);
}
}
return this.client;
}
在MqttAsyncClient的构造方法中,会同时创建出持久化对象,即MemoryPersistence的实例和发送接收消息处理器ClientComms的实例
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence,
MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
// Count characters, surrogate pairs count as one character.
// clientId长度计算
int clientIdLength = 0;
for (int i = 0; i if (Character_isHighSurrogate(clientId.charAt(i))) i++; clientIdLength++; } // 如果长度 > 65536 处理异常 if (clientIdLength > 65535) { throw new IllegalArgumentException("ClientId longer than 65535 characters"); } // 检查连接Broker URL的合法性 NetworkModuleService.validateURI(serverURI); this.serverURI = serverURI; this.clientId = clientId; this.persistence = persistence; // 创建持久化对象,如果未设置,默认使用内存持久化 if (this.persistence == null) { this.persistence = new MemoryPersistence(); } this.executorService = executorService; // 创建持久化对象的容器 -> 即HashTable this.persistence.open(clientId, serverURI); // 创建消息发送,接收处理器对象 this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService); // 清空内存持久容器数据 this.persistence.close(); this.topics = new Hashtable(); } 这里要注意一下,Eclipse.paho有两种持久化策略 内存持久化 文件持久化 分别对应MqttClientPersistence接口的两个不同的实现MemoryPersistence和MqttDefaultFilePersistence,默认使用内存持久化进行存储,如果使用文件存储,可以显式指定。通过指定文件路径的方式设置文件存储位置。 我们再来看一下ClientComms初始化的过程 public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException { // 设置当前状态为未连接状态 this.cOnState= DISCONNECTED; this.client = client; this.persistence = persistence; // 心跳包发送器,在创建MqttAsyncClient时创建的,类型为TimerPingSender this.pingSender = pingSender; // 心跳包发送器初始化 this.pingSender.init(this); this.executorService = executorService; // 创建用于追踪发送消息的追踪器对象 this.tokenStore = new CommsTokenStore(getClient().getClientId()); // 创建回调对象 this.callback = new CommsCallback(this); // 初始化消息状态处理对象。 this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender); callback.setClientState(clientState); } 这里有两个值得注意的地方 pingSender:Eclipse paho维护了自己的一套Client和 Broker探活的心跳机制,使用到了上述代码中提到的TimerPingSender,简单来说就是通过定时任务的方式不断的向Broker发送心跳包,如果在一段时间内不交互或者心跳无应答,即为异常,这是客户端会主动断开与Broker的连接。这个在后面的章节中会详细介绍,这里暂时不做展开 ClientState:保存正在发布的消息和将要发布的消息的状态信息,并对对应状态下的消息进行必要的处理,具体的处理方式会根据消息的质量(QOS)有所不同 在设置CommsCallback对象时,因为Eclipse paho集成了发布和订阅消息的功能,所以这里也会初始化两个队列用于存放接受消息的队列messageQueue和CompleteQueue。 不论对于发送消息,还是订阅消息来说,ClientState都是非常重要的,所以下面我们再看一下ClientState初始化的构造方法 protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore, CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException { // messageId使用记录映射 inUseMsgIds = new Hashtable(); // 特殊消息处理队列 pendingFlows = new Vector(); // qos2类型消息的出站队列 outboundQoS2 = new Hashtable(); // qos1类型消息的出站队列 outboundQoS1 = new Hashtable(); // qos0类型消息的出站队列 outboundQoS0 = new Hashtable(); // qos2类型消息的入站队列 inboundQoS2 = new Hashtable(); // 初始化PingREQ对象,用于发送心跳包 pingCommand = new MqttPingReq(); // 针对qos2的消息,用于记录发出Rel但未收到Comp响应的计数器 inFlightPubRels = 0; // 实际正在发出或接受,但未完成(针对qos1或qos2)回执处理的消息计数 actualInFlight = 0; this.persistence = persistence; this.callback = callback; this.tokenStore = tokenStore; this.clientComms = clientComms; this.pingSender = pingSender; // 加载持久化的数据. restoreState(); } 这里有两个值得注意的地方 这里一共初始化了6个队列,消息会在不同的阶段在各个队列中进行流转,这里暂时先不展开,之后会有章节专门介绍发送和接受消息的内部实现,介是会针对这些队列的作用做详细的说明 在初始化方法最后,调用了restoreState()方法,这个方法的主要作用简单来说就是将持久化到内存或者文件的数据重新加载到出站队列并进行发送操作,这里只说一下为什么会有这种机制,内部实现同样的等介绍发送和接受消息章节统一来解释。之所以有这样的机制,是因为当消息的qos设置为1或2时,发布者和broker,broker和订阅者之间需要通过非publish消息进行回执通信,这时如果网络连接失败或者网络抖动导致未收到回执,需要客户端重新发送这些消息以保证消息的可达性。再比如我们在发送消息时,可能因为业务峰值产生大量消息,此时消息会挤压在队列中,等待被依次发出,这时如果出现网络断开的情况,同样需要在恢复网络时重新进行发送。所以基于这样的一套重发机制的考虑才会在这里出现这样的操作。 至此,MQTTAsyncClient客户端对象创建完成。 二、与Broker建立连接 同样的,我们直接来看一下connect()方法的内部实现 public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException { // 判断连接状态,如果是已连接/正在连接/正在断开连接/已关闭这几种状态的话进行异常处理 if (comms.isConnected()) { throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); } if (comms.isConnecting()) { throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS); } if (comms.isDisconnecting()) { throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); } if (comms.isClosed()) { throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED); } // 获取连接信息对象,这个我们在配置配件中初始化过 if (optiOns== null) { optiOns= new MqttConnectOptions(); } this.cOnnOpts= options; this.userCOntext= userContext; final boolean automaticRecOnnect= options.isAutomaticReconnect(); // 根据不同的连接url创建网络模块 comms.setNetworkModules(createNetworkModules(serverURI, options)); comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect)); // Insert our own callback to iterate through the URIs till the connect // succeeds MqttToken userToken = new MqttToken(getClientId()); ConnectActionListener cOnnectActionListener= new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting); userToken.setActionCallback(connectActionListener); userToken.setUserContext(this); // If we are using the MqttCallbackExtended, set it on the // connectActionListener if (this.mqttCallback instanceof MqttCallbackExtended) { connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback); } comms.setNetworkModuleIndex(0); // 通过监听器建立连接 connectActionListener.connect(); return userToken; } 这里我们来看一下创建网络模块的实现 protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException { NetworkModule[] networkModules = null; String[] serverURIs = options.getServerURIs(); String[] array = null; if (serverURIs == null) { array = new String[] { address }; } else if (serverURIs.length == 0) { array = new String[] { address }; } else { array = serverURIs; } // 根据传入的url集合长度建立网络模块集合 networkModules = new NetworkModule[array.length]; for (int i = 0; i // 取出对应的url和options(连接设置),进一步创建 networkModules[i] = createNetworkModule(array[i], options); } return networkModules; } 我们继续看 // FACTORY_SERVICE_LOADER对应NetworkModuleFactory.class和NetworkModuleService.class.getClassLoader()的集合 private static final ServiceLoader FACTORY_SERVICE_LOADER = ServiceLoader.load( NetworkModuleFactory.class, NetworkModuleService.class.getClassLoader()); public static NetworkModule createInstance(String address, MqttConnectOptions options, String clientId) throws MqttException, IllegalArgumentException { try { URI brokerUri = new URI(address); applyRFC3986AuthorityPatch(brokerUri); // 获取到url的scheme String scheme = brokerUri.getScheme().toLowerCase(); // 通过scheme匹配不同的NetWorkModule实现创建不同的网络模块 for (NetworkModuleFactory factory : FACTORY_SERVICE_LOADER) { // 获取scheme,TLS对应ssl://,TCP对应tcp:// if (factory.getSupportedUriSchemes().contains(scheme)) { return factory.createNetworkModule(brokerUri, options, clientId); } } /* * To throw an IllegalArgumentException exception matches the previous behavior of * MqttConnectOptions.validateURI(String), but it would be nice to provide something more meaningful. */ throw new IllegalArgumentException(brokerUri.toString()); } catch (URISyntaxException e) { throw new IllegalArgumentException(address, e); } } 创建网络模块这里看着比较混乱,我来简单总结一下。 首先,Eclipse.paho作为Client连接时本身是支持集群模式的,连接集群模式的方式是将连接地址包装成字符串数组。 String[] hostArray = new String[]{"ssl://127.0.0.1:8883","tcp://127.0.0.1:1883"}; mqttConnectOptions.setServerURIs(hostArray); 其次,Eclipse.paho拿到地址后,要根据不同的地址创建不同的连接,当然这些连接有可能是TCP的有可能是WebSocket的,也有可能是TLS的。那么如何根据不同的地址创建不同的网络环境进行连接呢?Eclipse.paho会根据URL中不同的Scheme来选择使用不同的NetworkModuleFactory,进而创建出不同的NetworkModule,根据URL集合中对应的下标放入到NetworkModeul集合中。 网络模块创建完成,下面我们要真的去开启连接了 public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { synchronized (conLock) { if (isDisconnected() && !closePending) { // 将连接状态置为CONNECTING cOnState= CONNECTING; // 首次连接mqtt时,发送Connect控制报文。 cOnOptions= options; MqttConnect cOnnect= new MqttConnect(client.getClientId(), conOptions.getMqttVersion(), conOptions.isCleanSession(), conOptions.getKeepAliveInterval(), conOptions.getUserName(), conOptions.getPassword(), conOptions.getWillMessage(), conOptions.getWillDestination()); this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval()); this.clientState.setCleanSession(conOptions.isCleanSession()); this.clientState.setMaxInflight(conOptions.getMaxInflight()); tokenStore.open(); // 创建一个后台线程进行连接操作 ConnectBG cOnbg= new ConnectBG(this, token, connect, executorService); // 线程开启 conbg.start(); } else { if (isClosed() || closePending) { throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED); } else if (isConnecting()) { throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS); } else if (isDisconnecting()) { throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); } else { throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); } } } } 上面的代码中,我们需要注意两点 虽然是通过监听器connectActionListener创建连接,但是建立连接的过程最终还是会交给ClientComms处理 当与Broker建立连接时,需要发送一条控制报文为CONNECT类型的消息,如果连接成功,Broker会回执一条控制报文为CONNACK的消息,通知客户端已经与服务端成功建立连接。在Eclipse.paho中,MQTT官方支持的报文都被封装成对象,在发送和订阅消息时,根据一定的规则转换成对应的对象进行处理,非常的友好,下图是所有控制报文对应的消息类。 所以在这里会创建一个MqttConnect类型的消息用于建立连接。 下面是在后台线程中建立连接的过程 public void run() { Thread.currentThread().setName(threadName); MqttException mqttEx = null; try { // Reset an exception on existing delivery tokens. // This will have been set if disconnect occurred before delivery was // fully processed. MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens(); for (MqttDeliveryToken tok : toks) { tok.internalTok.setException(null); } // Save the connect token in tokenStore as failure can occur before send tokenStore.saveToken(conToken,conPacket); // Connect to the server at the network level e.g. TCP socket and then // start the background processing threads before sending the connect // packet. // 取出不同的网络模块并建立通信 NetworkModule networkModule = networkModules[networkModuleIndex]; networkModule.start(); // 创建线程用于消息接受 receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream()); // 线程执行 receiver.start("MQTT Rec: "+getClient().getClientId(), executorService); // 创建线程用于消息发送 sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream()); // 线程执行 sender.start("MQTT Snd: "+getClient().getClientId(), executorService); // 执行回调线程 callback.start("MQTT Call: "+getClient().getClientId(), executorService); // 发送Conn消息 internalSend(conPacket, conToken); } catch (MqttException ex) { mqttEx = ex; } catch (Exception ex) { mqttEx = ExceptionHelper.createMqttException(ex); } // 如果有异常,断开连接 if (mqttEx != null) { shutdownConnection(conToken, mqttEx); } } 上述代码需要注意下面几点 创建网络模块时,根据不同的协议创建不同的网络环境,以tcp为例,底层用的是socket通信,代码如下 public void start() throws IOException, MqttException { try { SocketAddress sockaddr = new InetSocketAddress(host, port); // 创建socket对象 socket = factory.createSocket(); // 通过socket进行连接 socket.connect(sockaddr, conTimeout*1000); socket.setSoTimeout(1000); } catch (ConnectException ex) { throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex); } } 发送connect消息的过程在这里不展开讲,小伙伴们只要知道这里发送了一条CONNECT类型消息即可,具体的发布/订阅内部实现我们留到下一章节详细讲解 Connect消息发送后,如果连接成功,会有一条connack类型的回执消息。这里我们也不展开讲,只是简单的看一下在订阅到connack消息后Eclipse paho又做了些什么,具体订阅的内部实现我们再次也不展开。在CommReceiver中封装了订阅消息的处理逻辑,并以后台线程的形式一直在获取订阅的数据,下面我们看下收到ack的代码 protected void notifyReceivedAck(MqttAck ack) throws MqttException { if (token == null) { // .... } else if (ack instanceof MqttConnack) { // 收到connack类型的消息,如果返回码是0,即为成功。 int rc = ((MqttConnack) ack).getReturnCode(); if (rc == 0) { synchronized (queueLock) { if (cleanSession) { clearState(); // Add the connect token back in so that users can be // notified when connect completes. tokenStore.saveToken(token,ack); } // 初始化计数器为0 inFlightPubRels = 0; actualInFlight = 0; // 将出站数据通过类型不同划分到发送队列和特殊消息队列中进行等待发送 restoreInflightMessages(); // 通知心跳发送器向broker发送心跳,并监控broker活跃 connected(); } } else { mex = ExceptionHelper.createMqttException(rc); throw mex; } clientComms.connectComplete((MqttConnack) ack, mex); notifyResult(ack, token, mex); tokenStore.removeToken(ack); // Notify the sender thread that there maybe work for it to do now synchronized (queueLock) { queueLock.notifyAll(); } } else { notifyResult(ack, token, mex); releaseMessageId(ack.getMessageId()); tokenStore.removeToken(ack); } checkQuiesceLock(); } 上述代码需要注意 出站队列作为消息的中间媒介最终都会将消息转移到待发送队列进行发送操作,这里具体不展开,只需要了解在建立连接时会把出站队列中的消息放到待发送队列和特殊消息队列中即可。 连接建立后,为了保证Client和Broker的连接状态活跃,需要定期发送控制报文为PINGREQ的消息,如果收到PINGRESP。即证明服务器与客户端连接正常。这里Eclipse paho维护了一套自己的心跳探活机制。我们同样也留在下一个章节去详细了解。 至此,整个Eclipse paho作为客户端建立与Broker的过程就已经完成了。 我们回顾下整个流程 初始化MqttAsyncClient实例 验证连接URL合法性 初始化持久化存储对象,默认使用内存持久化 初始化ClientComms实例 初始化CommsTokenStore实例 初始化CommsCallBack实例 初始化ClientState实例 。初始化对应队列 。加载持久化存储中的数据,并且放到出站队列outboundqosX中 与服务器建立连接 初始化网络模块,根据不同的url对应的scheme加载不同的网络模块 通过ClientComms建立连接 。创建MqttConn对象 。开启后台线程 。根据NeworkModule建立不同的网络环境,以tcp为例,建立socket连接 。开启线程处理发送和订阅消息 。发送MqttConn消息 。接受到MqttConnack消息 。将出站队列outboundqosX中的消息,待发送队列pendingMessage和特殊消息队列pendingFlows中 以上就是Eclipse paho作为MQTTClient与Broker建立连接的整个过程和代码分析。下一章我们主要解决本章遗留下的几个核心问题: 消息的发布、订阅内部实现 心跳机制与断线重连机制 消息在各个队列间是如何流转的 小伙伴们敬请期待吧~~~