其实本来是要讲erlang如何解决tcp粘包问题的,刚好erlang_mysql_driver里面就有关于这个问题的一种解决方式,所以干脆就以erlang_mysql_driver的源码为例来探究下该问题的解决方案。
tcp 粘包问题
mysql的协议包是建立在tcp的基础上的,而tcp协议是流协议,也就是在使用的时候可以保证按顺序收到,但是并不是对方发送多少次,我们就能接收多少次。
这就是tcp粘包问题了,例如你调用两次gen_tcp:send,发了两句话,“你好”“吃饭了吗”。可是我这边在调用gen_tcp:recv接收的时候,可能是需要调用3次,分别收到 “你”“好吃”“饭了吗”。那我就根本不知道你在说什么了。
针对上述问题的解决方案是在协议自定义的时候用len+body的形式,len所占的空间固定,比如只占一个字节,那么你就可以这样发“2你好4吃饭了吗”。我这边每次先取一个字节,得到2,然后就取后面2个数据“你好”。再次循环,取一个字节4,然后取后面4个字节“吃饭了吗”。这样我们就可以正常聊天了。(当然中文不止一个字节,我随便定的,不要纠结哈)本文要讲的就是针对这种方案的erlang具体设计了。
另外关于tcp粘包问题的概念也挺多的,有一种特别的看法是把粘包问题看成传输层的问题,然后得出的结论是tcp以前会出现“粘包问题”现在不会了。这种概念的“粘包问题”是指多进程通过同一端口发送消息如何避免乱序。这里不会对这些进行讨论,在这篇文章中我理解的tcp粘包问题是在应用层的,也就是因为tcp的字节流无边界,发送方发出N个协议包,接收方收到M个协议包的问题,N可能大于M也可能小于M。
mysql_recv是怎么处理tcp粘包
erlang_mysql_driver使用mysql_recv模块来接收mysql服务器发送来的数据,那么mysql_recv进程肯定是需要处理tcp粘包的。
mysql_recv建立连接的方式
我们先看下,mysql_recv是怎么建立连接的?
这个连接建立的过程在mysql_recv:init中
init(Host, Port, LogFun, Parent) ->case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}]) of{ok, Sock} ->Parent ! {mysql_recv, self(), init, {ok, Sock}},State &#61; #state{socket &#61; Sock,parent &#61; Parent,log_fun &#61; LogFun,data &#61; <<>>},loop(State);E ->LogFun(?MODULE, ?LINE, error,fun() ->{"mysql_recv: Failed connecting to ~p:~p : ~p",[Host, Port, E]}end),Msg &#61; lists:flatten(io_lib:format("connect failed : ~p", [E])),Parent ! {mysql_recv, self(), init, {error, Msg}}end.
gen_tcp:connect(Host, Port, [binary, {packet, 0}, {keepalive, true}])
这里有个很重要的默认参数就是{active, true}&#xff0c;在不显式指定active的时候&#xff0c;默认为true。
true指定的是主动消息接收&#xff0c;也就是一旦有数据达到这个Socket&#xff0c;那么这个Socket的控制进程&#xff08;mysql_recv&#xff09;就会收到{tcp, …}, {tcp_error, …}, {tcp_closed,…}这一类的消息。
而packet这个参数&#xff0c;跟我们要讨论的tcp粘包问题关系就很大了&#xff0c;它就是erlang用来解决tcp粘包的&#xff0c;如{packet, 1}表明协议包的第一个bytes用来表示长度&#xff0c;那么我们只需要直接接收数据就行了&#xff0c;或者直接发送数据。erlang会自动解析协议包头的长度&#xff0c;但是在mysql_recv中并没有使用packet这个功能。
mysql_recv send_packet
那么mysql_recv自己是怎么解决的呢&#xff1f;我们看它收到数据后怎么做的&#xff1f;
loop(State) ->Sock &#61; State#state.socket,receive{tcp, Sock, InData} ->NewData &#61; list_to_binary([State#state.data, InData]),Rest &#61; sendpacket(State#state.parent, NewData),loop(State#state{data &#61; Rest});{tcp_error, Sock, Reason} ->LogFun &#61; State#state.log_fun,LogFun(?MODULE, ?LINE, error,fun() ->{"mysql_recv: Socket ~p closed : ~p",[Sock, Reason]}end),State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},error;{tcp_closed, Sock} ->LogFun &#61; State#state.log_fun,LogFun(?MODULE, ?LINE, debug,fun() ->{"mysql_recv: Socket ~p closed", [Sock]}end),State#state.parent ! {mysql_recv, self(), closed, normal},errorend.
每次收到数据后InData后&#xff0c;先把之前缓存在mysql_recv state中的data取出&#xff0c;然后和InData合并。NewData &#61; InData &#43; 之前缓存的Data&#xff0c;然后调用send_packet。
sendpacket(Parent, Data) ->case Data of<<Length:24/little, Num:8, D/binary>> ->ifLength &#61;<size(D) ->{Packet, Rest} &#61; split_binary(D, Length),Parent ! {mysql_recv, self(), data, Packet, Num},sendpacket(Parent, Rest);true ->Dataend;_ ->Dataend.
sendpacket并没有直接发送数据&#xff0c;sendpacket取出前面3个字节&#xff0c;作为这个包长度的值。为什么是3个字节呢&#xff1f;为什么不是2个或4个呢&#xff0c;因为这是mysql规定&#xff0c;mysql的数据包就是用前面3个字节表示长度&#xff0c;第4个字节表示序号&#xff0c;可见上面的代码匹配 Length:24, Num:8。
当我们知道这个包的长度后&#xff0c;就看后面的data够不够了&#xff0c;如果不够那就继续进入#state.data中缓存。如果够了&#xff0c;那就按Length截取出数据&#xff0c;发送给mysql_conn&#xff0c;剩下的data继续进入#state.data中缓存。
其他处理tcp粘包的方法
事实上erlang本身的{packet, N}这个参数就提供了&#xff0c;强大的tcp粘包处理。
在不考虑{packet&#xff0c; N}的情况下&#xff0c;其他处理tcp粘包的方法&#xff1a;
- mysql_recv给我们提供了一种主动接收信息{active, true}时处理粘包的方式&#xff0c;这种方式其实就是准备了一个数据缓存区&#xff08;#state.data&#xff09;&#xff0c;然后按包头定义的长度取数据&#xff0c;多的放回&#xff0c;少的继续等待。
- 使用参数{active,false}的时候&#xff0c;我们是用gen_tcp:recv(Socket, Len)的方式来接收数据的&#xff0c;只有调用了recv的时候才会去按Len指定的长度获取数据&#xff0c;那么这种方式的话&#xff0c;其实解决的方法也比较简单。每次先gen_tcp:recv(Socket, 3)&#xff0c;接收到数据后取出该数据的大小DataLen&#xff0c;然后调用gen_tcp:recv(Socket, DataLen).这次接收完数据后&#xff0c;继续gen_tcp:recv(Socket, 3)就行了。
当然上面说的两种方式&#xff0c;包括{packet, N}都是建立在我们自定义的协议包使用固定的字节数&#xff08;如mysql的3&#xff09;&#xff0c;来表示该报的长度。包的格式 &#61; Len &#43; Body&#xff0c;Len所占的字节数固定。这里的Len的字节数固定后&#xff0c;这个包的最大值其实也就知道了&#xff0c;如mysql的一个包最大为16m&#xff08;(2^24)-1&#61;(16M-1)字节&#xff09;&#xff0c;超过该值就要分包发送。