开发十年,就只剩下这套架构体系了! >>>
熟悉TCP编程的读者可能都会知道,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。这里,首先讲述下基本知识,然后模拟一个没有考虑TCP粘包/拆包导致功能异常的案例,帮助大家进行分析。
一、基础知识
问题说明
通常,会发生上图中的4种情况:
1 正常 2 粘包 3、4 拆包
原因:
1 应用程序写入的字节大小大于套接口发送缓冲区大小;
2 进行MSS大小的TCP分段;
3 以太网帧的payload大于MTU进行IP分片。
解决方案:
由于底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决归纳如下:
1 消息定长
2 在包尾增加回车换行符进行分割,例如FTP协议
3 message分为消息头和消息体
4 更复杂的应用层协议
二、没有考虑TCP粘包/拆包导致功能异常的案例
在功能测试时往往没有问题,但是压力测试过程中,问题就会暴露出来。如果代码没有考虑,往往就会出现解码错位或者错误,导致程序不能工作。
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8").substring(0, req.length
- System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + body
+ " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
private int counter;
private byte[] req;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf message = null;
for (int i=0;i<100;i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
byte.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.warning("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
服务端运行结果显示只发送了两条请求消息,因而客户端理应也返回了两条“BAD ORDER”应答消息,但是
事实是客户端只返回了一条包含两个“BAD ORDER”指令的消息,说明服务端和客户端都发生了粘包。
三、Netty是如何解决TCP粘包问题的
Netty默认提供了多种编解码器用于处理半包。
支持TCP粘包的TimeServer
public class TimeServer {
public void bind(int port) throws Exception {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if(args!=null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch(NumberFormatException e) {
}
}
new TimeServer().bind(port);
}
}
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++ counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
运行结果完全符合预期,说明通过使用LineBasedFrameDecoder和StringDecoder成功解决了TCP粘包导致的读半包问题。
只要将支持半包解码的Handler添加到ChannelPipeline中即可,不需要写额外的代码,用起来很方便。
四、LineBasedFrameDecoder和StringDecoder的原理分析
简单地来说,LineBasedFrameDecoder是以换行符为结束标志的解码器,同事支持配置当行的最大长度。如果连续读取到最大长度house仍然没有发现换行符,就会抛出异常,同时忽略之前读到的异常码流。
而StringDecoder是将接收到的对象转换成字符串,然后继续调用后面的Handler。