作者:Jaaaaasonnv_116 | 来源:互联网 | 2023-08-19 00:08
学习mqtt协议和emqttd开源项目http:emqtt.comemqttd源码版本号是v1.1.3。http:emqtt.comdownloads1113源码写得比较绕
学习mqtt协议和emqttd开源项目http://emqtt.com/
emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113
源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:
(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)
1、-module(emqttd_client).
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,packet_opts = PacketOpts,proto_state = ProtoState}) ->
2、-module(emqttd_parser).
parse(<>, {none, Limit}) ->
3、-module(emqttd_protocol).
解析PUBLISH消息
received(Packet &#61; ?PACKET(_Type), State) ->
process(Packet &#61; ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
publish(Packet &#61; ?PUBLISH_PACKET(?QOS_0, _PacketId),#proto_state{client_id &#61; ClientId, username &#61; Username, session &#61; Session}) ->
4、-module(emqttd_session).
publish(_SessPid, Msg &#61; #mqtt_message{qos &#61; ?QOS_0}) ->
5、-module(emqttd).
publish(Msg) when is_record(Msg, mqtt_message) ->
6、-module(emqttd_server).
publish(Msg &#61; #mqtt_message{from &#61; From}) ->
7、-module(emqttd_pubsub).
这里考虑了本机节点和集群节点的情况
publish(Topic, Msg) ->lists:foreach(fun(#mqtt_route{topic &#61; To, node &#61; Node}) when Node &#61;:&#61; node() ->?MODULE:dispatch(To, Msg);(#mqtt_route{topic &#61; To, node &#61; Node}) ->rpc:cast(Node, ?MODULE, dispatch, [To, Msg])end, emqttd_router:lookup(Topic)).
dispatch(Topic, Msg) ->
SubPid ! {dispatch, Topic, Msg};
8、-module(emqttd_session).
handle_info({dispatch, Topic, Msg}, Session &#61; #session{subscriptions &#61; Subscriptions})when is_record(Msg, mqtt_message) ->
这里会区分Client是否在线离线&#xff0c;还有CleanSession&#61;0/QoS&#61;1,2的情况&#xff0c;需要存储离线消息。
%% Queue message if client disconnected
dispatch(Msg, Session &#61; #session{client_pid &#61; undefined, message_queue &#61; Q}) ->hibernate(Session#session{message_queue &#61; emqttd_mqueue:in(Msg, Q)});%% Deliver qos0 message directly to client
dispatch(Msg &#61; #mqtt_message{qos &#61; ?QOS0}, Session &#61; #session{client_pid &#61; ClientPid}) ->ClientPid ! {deliver, Msg},hibernate(Session);dispatch(Msg &#61; #mqtt_message{qos &#61; QoS}, Session &#61; #session{message_queue &#61; MsgQ})when QoS &#61;:&#61; ?QOS1 orelse QoS &#61;:&#61; ?QOS2 ->case check_inflight(Session) oftrue ->noreply(deliver(Msg, Session));false ->hibernate(Session#session{message_queue &#61; emqttd_mqueue:in(Msg, MsgQ)})end.
9、-module(emqttd_client).
handle_info({deliver, Message}, State) ->with_proto_state(fun(ProtoState) ->emqttd_protocol:send(Message, ProtoState)end, State);
10、-module(emqttd_protocol).
send(Packet, State &#61; #proto_state{sendfun &#61; SendFun})when is_record(Packet, mqtt_packet) ->