热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mqtt协议broker之moqutte源码研究四之PUBLISH报文处理

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker向client发布消息

public void processPublish(Channel channel, MqttPublishMessage msg) {final MqttQoS qos = msg.fixedHeader().qosLevel();final String clientId = NettyUtils.clientID(channel);LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,msg.variableHeader().topicName(), msg.variableHeader().packetId(), qos);switch (qos) {case AT_MOST_ONCE:this.qos0PublishHandler.receivedPublishQos0(channel, msg);break;case AT_LEAST_ONCE:this.qos1PublishHandler.receivedPublishQos1(channel, msg);break;case EXACTLY_ONCE:this.qos2PublishHandler.receivedPublishQos2(channel, msg);break;default:LOG.error("Unknown QoS-Type:{}", qos);break;}
}

根据发布的消息的qos,用不同的QosPublishHandler来处理,QosPublishHandler有三个具体实现,分别是Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler.
这里面先讲解Qos1PublishHandler的处理

void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {// verify if topic can be writefinal Topic topic = new Topic(msg.variableHeader().topicName());String clientID = NettyUtils.clientID(channel);String username = NettyUtils.userName(channel);if (!m_authorizator.canWrite(topic, username, clientID)) {LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);return;}final int messageID = msg.variableHeader().packetId();// route message to subscribersIMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);toStoreMsg.setClientID(clientID);this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);sendPubAck(clientID, messageID);if (msg.fixedHeader().isRetain()) {if (!msg.payload().isReadable()) {m_messagesStore.cleanRetained(topic);} else {// before wasn't storedm_messagesStore.storeRetained(topic, toStoreMsg);}}// 修改publish消息,slice出的ByteBuf对象,原文中存在内存泄漏MoquetteMessage moquetteMessage = new MoquetteMessage(msg.fixedHeader(), msg.variableHeader(), msg.content());m_interceptor.notifyTopicPublished(moquetteMessage, clientID, username);msg.content().release();
}1.鉴权,该client下的username是否有对该topic发布消息(对topic写)的权限。2.创建一个IMessagesStore.StoredMessage,同时把消息推送给所有该对该消息的订阅者。if (LOG.isTraceEnabled()) {LOG.trace("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}, payload={}, " +"subscriptionTree={}", pubMsg.getClientID(), topic, messageID, DebugUtils.payload2Str(pubMsg.getPayload()),subscriptions.dumpTree());} else if(LOG.isInfoEnabled()){LOG.info("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}", pubMsg.getClientID(), topic,messageID);}publish2Subscribers(pubMsg, topic);判断是否是跟踪模式,如果是的话,会把当前所有的订阅关系打印到日志,由于这个需要遍历topic树,消耗比较大,所以是可配置的,在moquette.cof里面配置。
核心的处理逻辑在下面,接着往下看void publish2Subscribers(IMessagesStore.StoredMessage pubMsg, Topic topic) {List topicMatchingSubscriptions = subscriptions.matches(topic);final String topic1 = pubMsg.getTopic();final MqttQoS publishingQos = pubMsg.getQos();final ByteBuf origPayload = pubMsg.getPayload();for (final Subscription sub : topicMatchingSubscriptions) {MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);ClientSession targetSession = m_sessionsStore.sessionForClient(sub.getClientId());boolean targetIsActive = this.connectionDescriptors.isConnected(sub.getClientId());
//TODO move all this logic into messageSender, which puts into the flightZone only the messages that pull out of the queue.if (targetIsActive) {if(LOG.isDebugEnabled()){LOG.debug("Sending PUBLISH message to active subscriber. CId={}, topicFilter={}, qos={}",sub.getClientId(), sub.getTopicFilter(), qos);}// we need to retain because duplicate only copy r/w indexes and don't retain() causing// refCnt = 0ByteBuf payload = origPayload.retainedDuplicate();MqttPublishMessage publishMsg;if (qos != MqttQoS.AT_MOST_ONCE) {// QoS 1 or 2int messageId = targetSession.inFlightAckWaiting(pubMsg);// set the PacketIdentifier only for QoS > 0publishMsg = notRetainedPublishWithMessageId(topic1, qos, payload, messageId);} else {publishMsg = notRetainedPublish(topic1, qos, payload);}this.messageSender.sendPublish(targetSession, publishMsg);} else {if (!targetSession.isCleanSession()) {if(LOG.isDebugEnabled()){LOG.debug("Storing pending PUBLISH inactive message. CId={}, topicFilter={}, qos={}",sub.getClientId(), sub.getTopicFilter(), qos);}// store the message in targetSession queue to delivertargetSession.enqueue(pubMsg);}}}
}

大概分为以下几步
2.1.根据topic找出匹配的订阅集合list,这里面由于涉及到比较大的计算,所以单独讲解

public List matches(Topic topic) {Queue tokenQueue &#61; new LinkedBlockingDeque<>(topic.getTokens());List matchingSubs &#61; new ArrayList<>();subscriptions.get().matches(tokenQueue, matchingSubs);// 客户端使用带通配符的主题过滤器请求订阅时&#xff0c;客户端的订阅可能会重复&#xff0c;因此发布的消息可能会匹配多个过滤器。对于这种情//况&#xff0c;服务端必须将消息分发给所有订阅匹配的QoS等级最高的客户端。服务端之后可以按照订阅的QoS等级&#xff0c;分发消息的副本给每//一个匹配的订阅者。Map subsForClient &#61; new HashMap<>();for (ClientTopicCouple matchingCouple : matchingSubs) {Subscription existingSub &#61; subsForClient.get(matchingCouple.clientID);Subscription sub &#61; this.subscriptionsStore.getSubscription(matchingCouple);if (sub &#61;&#61; null) {// if the m_sessionStore hasn&#39;t the sub because the client disconnectedcontinue;}// update the selected subscriptions if not present or if has a greater qosif (existingSub &#61;&#61; null || existingSub.getRequestedQos().value() (subsForClient.values());
}
可以看的出来&#xff0c;会先创建一个队列&#xff0c;存储topic的层级比如/a/b/c&#xff0c;队列里面就会有三个运输[c,b,a] 这里面之所以要用到队列而不是&#xff0c;list就是因为后面进行匹配的时候需要确保先从第一个层级开始匹配&#xff0c;而不是最后一个
void matches(Queue tokens, List matchingSubs) {Token t &#61; tokens.poll();// check if t is null <&#61;> tokens finishedif (t &#61;&#61; null) {matchingSubs.addAll(m_subscriptions);// check if it has got a MULTI child and add its subscriptionsfor (TreeNode n : m_children) {if (n.getToken() &#61;&#61; Token.MULTI || n.getToken() &#61;&#61; Token.SINGLE) {matchingSubs.addAll(n.subscriptions());}}return;}// we are on MULTI, than add subscriptions and returnif (m_token &#61;&#61; Token.MULTI) {matchingSubs.addAll(m_subscriptions);return;}for (TreeNode n : m_children) {if (n.getToken().match(t)) {// Create a copy of token, else if navigate 2 sibling it// consumes 2 elements on the queue instead of onen.matches(new LinkedBlockingQueue<>(tokens), matchingSubs);// TODO don&#39;t create a copy n.matches(tokens, matchingSubs);}}
}

这段代码不好理解&#xff0c;涉及到迭代topic树io.moquette.spi.impl.subscriptions.TreeNode&#xff0c;下面以一个图说明
另外对topic匹配规则不熟悉的同学可以看一下这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md
假如有A,B,C,D, E五个client&#xff0c;其中A订阅了/test/#&#xff0c;B订阅了/test/hello/#&#xff0c;C 订阅了/test/hello/beijing,
D订阅了/test/&#43;/hello&#xff0c;现在E向topic-name为/test/hello/shanghai发布了一条消息请问哪几个client应该收到这条消息。
向画出topic树如下&#xff08;请原谅我手画&#xff09;
mqtt协议-broker之moqutte源码研究四之PUBLISH报文处理
先分析 E发布消息的整个过程&#xff1a;
2.1.1./test/hello/shanghai被放入queue&#xff0c;取出来的顺序依次为test&#xff0c;hello&#xff0c;shanghai
2.1.2.第一轮先匹配test&#xff0c;test不为空&#xff0c;RootNode(其实是null)不为#&#xff0c;执行到遍历RootNode下的子节点&#xff0c;RootNode先的子节点只有一个&#xff0c;test&#xff0c;test.equals(test)&#xff0c;然后当前treenode变为test&#xff0c;
2.1.3.从queue里面取出hello&#xff0c;hello不为空&#xff0c;test不为#&#xff0c;遍历test这个treenode的子节点&#xff0c;test有三个子节点&#xff0c;分别是&#xff08;#&#xff0c;&#43;&#xff0c;hello&#xff09;
2.1. 3.1 子节点是#&#xff0c;# .mathcs(hello)&#xff0c;当前节点是#&#xff0c;然后从队列里面取出shanghai&#xff0c;shanghai不为空&#xff0c;#为#&#xff0c;当前迭代终止&#xff0c;节点A匹配放入匹配的list&#xff0c;
2.1.3.2 子节点是&#43;&#xff0c;&#43; .mathcs(hello)当前节点是&#43;&#xff0c;然后从队列里面取出shanghai&#xff08;这里可能有的同学有疑问&#xff0c;为什么还能取出shanghai呢&#xff0c;因为进行下一级迭代的时候是new的新的queue&#xff09;&#xff0c;上海不为空&#xff0c;&#43;不为#所以不匹配&#xff0c;接着匹配&#43;这个treenode的子节点&#xff0c;&#43;只有一个子节点
2.1. 3.2.1 当前节点是hello&#xff0c;hello.mathcs(shanghai)不成立&#xff0c;迭代终止&#xff0c;所以D不会放入匹配的list
1.3.3 子节点是hello&#xff0c;hello.equals(hello)&#xff0c;当前节点变成hello&#xff0c;然后从队列里面取出shanghai&#xff0c;shanghai不为空&#xff0c;hello不为#&#xff0c;遍历hello这个treenode下的子节点&#xff0c;hello先有两个子节点&#xff0c;分别是&#xff08;#&#xff0c;beijing&#xff09;&#xff0c;
2.1.3.3.1 子节点是#&#xff0c;# .mathcs(hello)&#xff0c;当前节点是#&#xff0c;从队列里面取出的是空&#xff0c;所以直接会走第一个if分支&#xff0c;并且把B放入匹配的list&#xff0c;并且退出方法。
2.1. 3.3.2 子节点是beijing&#xff0c;beijing.equals&#xff08;shanghai&#xff09;不成立&#xff0c;退出迭代
最终能够匹配成功的只有A,B这两个client
也就说说能够被成功匹配&#xff0c;要么/test/hello/shanghai的每一层级都能成功匹配&#xff0c;“&#43;”能够够任意的单个层级&#xff0c;或者某一个层级是“#”&#xff08;分别对应上面的两个if分支&#xff09;
到此位置&#xff0c;有topic-name匹配由topic-filters组成的topic树的整个过程分析完成了。下面接着回到上面的publish2Subscribers的这个方法讲解匹配出client之后的动作

2.2 遍历找出的匹配的client,确定qos&#xff0c;qos取订阅请求要求的qos和发布的消息自身的qos的最小值&#xff0c;这个是mqtt协议自身规定的&#xff0c;之所以这样规定是因为&#xff0c;订阅的时候其基本单位是某个topic&#xff0c;订阅者只能订阅一个topic&#xff0c;而不能订阅一个消息&#xff0c;而发布消息的基本单位是一个消息&#xff0c;一个topic下可以有很多个消息&#xff0c;不同的消息可以有不同的qos&#xff0c;所以这里面&#xff0c;真正的qos是由订阅方和发布方共同决定的&#xff0c;出于性能的考虑&#xff0c;去最小的qos
2.3 根据连接描述符&#xff0c;判断是否client依然在线&#xff0c;如果不在线&#xff0c;且客户端要求保留会话&#xff0c;则把消息保存到该client的session的BlockingQueue&#xff0c;以待client再次上线之后&#xff0c;再次发送给该client&#xff0c;这个对应着在建立连接的时候有一个republish的动作&#xff0c;具体看https://blog.51cto.com/13579730/2073630的 第10步
2.3.如果在线&#xff0c;根据qos做不同的处理&#xff0c;如果qos是0&#xff0c;比较简单&#xff0c;之间发送&#xff0c;qos是1或者2&#xff0c;则会先把消息放入outboundFlightZone&#xff0c;产生一个messageId&#xff0c;再通过PersistentQueueMessageSender进行发送
io.moquette.spi.impl.PersistentQueueMessageSender#sendPublish&#xff0c;具体的分发逻辑比较简单&#xff0c;这里不详细讲解。类之间的关系是
ProtocolProcessor--》qos1PublishHandler--》MessagesPublisher--》PersistentQueueMessageSender&#xff0c;基本的逻辑就是通过ConnectionDescriptorStore进行发送&#xff0c;对于发送失败的要求保存会话的qos1或者2消息&#xff0c;将会继续保留&#xff0c;知道重复成功

到此publish2Subscribers方法即发送的核心逻辑讲解完了&#xff0c;让我们回到io.moquette.spi.impl.Qos1PublishHandler#receivedPublishQos1这个方法&#xff0c;

3.发送PUBACK消息&#xff0c;
4.如果是retain消息&#xff0c;但是有没有paylod&#xff0c;将该topic下的retain消息清除掉&#xff0c;可以理解成是客户端主动要求清除的&#xff0c;因为它发送了一个空的消息&#xff0c;如果有payload&#xff0c;则存储retain消息&#xff0c;对于保留消息&#xff0c;详细看这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/0303-PUBLISH.md 我简单总结一下&#xff0c;1.每个topic下永远只会存储一条retain消息&#xff0c;如果发送了两条&#xff0c;那么后面的将会将前面的覆盖&#xff1b;2.如果客户端发送了一个零字节的retain消息&#xff0c;broker将会清理调该topic下的retain消息&#xff0c;因为broker不会存储零字节的retain消息&#xff1b;3.服务端的保留消息不是会话状态的组成部分。服务端应该保留那种消息直到客户端删除它。4.当client收到broker发送的一个retain消息是&#xff0c;可以理解成这是client新建立的一个订阅的第一条消息。
5.唤醒拦截器

到此moquette对PUBLISH报文的处理讲解完了&#xff0c;这里面只讲解了qos1的处理&#xff0c;是因为qos0处理比较简单&#xff0c;而qos2我们没什么应用场景。另外这里说明一下比较容易混淆的概念cleanSession与retain消息
1.retain消息并不是session的一部分&#xff0c;它不与client挂钩&#xff0c;而是与topic-filter挂钩。也就是说当发布这个retain消息的clientsession不存在了&#xff0c;但是retain消息依然存在&#xff0c;除非有client主动删除它&#xff0c;
2.对于要求保留会话的client&#xff0c;会存在一个broker主动重新发送消息的过程&#xff0c;这个动作实在client重新建立连接的时候&#xff0c;具体看这里https://blog.51cto.com/13579730/2073630 的第10步&#xff0c;这是因为broker有责任对要求保留会话的client重新发送qos1与qos2消息
3.对于client发布订阅的时候&#xff0c;broker也会有一个主动发送消息的过程&#xff0c;具体看这里https://blog.51cto.com/13579730/2073914 的第8步
4.qos1消息和qos2是在消息发送失败或者&#xff0c;client不在线的时候&#xff0c;存储到clientsession里面的一个BlockingQueue里面的&#xff0c;而retain消息是在&#xff0c;broker收到的时候直接存储到IMessagesStore进行存储的&#xff0c;其底层是个Map

转:https://blog.51cto.com/13579730/2074290



推荐阅读
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 推荐系统遇上深度学习(十七)详解推荐系统中的常用评测指标
    原创:石晓文小小挖掘机2018-06-18笔者是一个痴迷于挖掘数据中的价值的学习人,希望在平日的工作学习中,挖掘数据的价值, ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了使用C++Builder实现获取USB优盘序列号的方法,包括相关的代码和说明。通过该方法,可以获取指定盘符的USB优盘序列号,并将其存放在缓冲中。该方法可以在Windows系统中有效地获取USB优盘序列号,并且适用于C++Builder开发环境。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • 本文概述了JNI的原理以及常用方法。JNI提供了一种Java字节码调用C/C++的解决方案,但引用类型不能直接在Native层使用,需要进行类型转化。多维数组(包括二维数组)都是引用类型,需要使用jobjectArray类型来存取其值。此外,由于Java支持函数重载,根据函数名无法找到对应的JNI函数,因此介绍了JNI函数签名信息的解决方案。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有