热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

我的mqtt协议和emqttd开源项目个人理解(3)客户端publish消息QoS==0的源码分析

学习mqtt协议和emqttd开源项目http:emqtt.comemqttd源码版本号是v1.1.3。http:emqtt.comdownloads1113源码写得比较绕

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,packet_opts = PacketOpts,proto_state = ProtoState}) ->


2、-module(emqttd_parser).

parse(<>, {none, Limit}) ->

3、-module(emqttd_protocol).

解析PUBLISH消息

received(Packet &#61; ?PACKET(_Type), State) ->
process(Packet &#61; ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
publish(Packet &#61; ?PUBLISH_PACKET(?QOS_0, _PacketId),#proto_state{client_id &#61; ClientId, username &#61; Username, session &#61; Session}) ->


4、-module(emqttd_session).

publish(_SessPid, Msg &#61; #mqtt_message{qos &#61; ?QOS_0}) ->

5、-module(emqttd).

publish(Msg) when is_record(Msg, mqtt_message) ->

6、-module(emqttd_server).

publish(Msg &#61; #mqtt_message{from &#61; From}) ->

7、-module(emqttd_pubsub).

这里考虑了本机节点和集群节点的情况

publish(Topic, Msg) ->lists:foreach(fun(#mqtt_route{topic &#61; To, node &#61; Node}) when Node &#61;:&#61; node() ->?MODULE:dispatch(To, Msg);(#mqtt_route{topic &#61; To, node &#61; Node}) ->rpc:cast(Node, ?MODULE, dispatch, [To, Msg])end, emqttd_router:lookup(Topic)).


dispatch(Topic, Msg) ->
SubPid ! {dispatch, Topic, Msg};


8、-module(emqttd_session).

handle_info({dispatch, Topic, Msg}, Session &#61; #session{subscriptions &#61; Subscriptions})when is_record(Msg, mqtt_message) ->这里会区分Client是否在线离线&#xff0c;还有CleanSession&#61;0/QoS&#61;1,2的情况&#xff0c;需要存储离线消息。

%% Queue message if client disconnected
dispatch(Msg, Session &#61; #session{client_pid &#61; undefined, message_queue &#61; Q}) ->hibernate(Session#session{message_queue &#61; emqttd_mqueue:in(Msg, Q)});%% Deliver qos0 message directly to client
dispatch(Msg &#61; #mqtt_message{qos &#61; ?QOS0}, Session &#61; #session{client_pid &#61; ClientPid}) ->ClientPid ! {deliver, Msg},hibernate(Session);dispatch(Msg &#61; #mqtt_message{qos &#61; QoS}, Session &#61; #session{message_queue &#61; MsgQ})when QoS &#61;:&#61; ?QOS1 orelse QoS &#61;:&#61; ?QOS2 ->case check_inflight(Session) oftrue ->noreply(deliver(Msg, Session));false ->hibernate(Session#session{message_queue &#61; emqttd_mqueue:in(Msg, MsgQ)})end.


9、-module(emqttd_client).

handle_info({deliver, Message}, State) ->with_proto_state(fun(ProtoState) ->emqttd_protocol:send(Message, ProtoState)end, State);

10、-module(emqttd_protocol).

send(Packet, State &#61; #proto_state{sendfun &#61; SendFun})when is_record(Packet, mqtt_packet) ->


推荐阅读
  • Servlet多用户登录时HttpSession会话信息覆盖问题的解决方案
    本文讨论了在Servlet多用户登录时可能出现的HttpSession会话信息覆盖问题,并提供了解决方案。通过分析JSESSIONID的作用机制和编码方式,我们可以得出每个HttpSession对象都是通过客户端发送的唯一JSESSIONID来识别的,因此无需担心会话信息被覆盖的问题。需要注意的是,本文讨论的是多个客户端级别上的多用户登录,而非同一个浏览器级别上的多用户登录。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • WebSocket与Socket.io的理解
    WebSocketprotocol是HTML5一种新的协议。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文探讨了在设置了HTTP客户端超时时间后,向HTTP服务器发送请求时出现两个请求的情况。其中一个请求正常,另一个请求无法获取请求参数。文章分析了可能导致此问题的原因,并提供了解决方案。 ... [详细]
  • LVS实现负载均衡的原理LVS负载均衡负载均衡集群是LoadBalance集群。是一种将网络上的访问流量分布于各个节点,以降低服务器压力,更好的向客户端 ... [详细]
  • 本文介绍了如何使用go语言实现一个一对一的聊天服务器和客户端,包括服务器开启、等待客户端连接、关闭连接等操作。同时提供了一个相关的多人聊天的链接供参考。 ... [详细]
  • 在单位的一台4cpu的服务器上部署了esxserver,挂载了6个虚拟机,目前运行正常。在安装部署过程中,得到了cnvz.net论坛精华区 ... [详细]
author-avatar
Jaaaaasonnv_116
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有