热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty(三)

开发十年,就只剩下这套架构体系了!>>>  熟悉TCP编程的读者可能都会知道,无论是服务端

开发十年,就只剩下这套架构体系了! >>>   Netty(三)

熟悉TCP编程的读者可能都会知道,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。这里,首先讲述下基本知识,然后模拟一个没有考虑TCP粘包/拆包导致功能异常的案例,帮助大家进行分析。

一、基础知识

问题说明

Netty(三)

通常,会发生上图中的4种情况:

1  正常  2  粘包  3、4  拆包

原因:

1  应用程序写入的字节大小大于套接口发送缓冲区大小;

2  进行MSS大小的TCP分段;

3  以太网帧的payload大于MTU进行IP分片。

Netty(三)

解决方案:

由于底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决归纳如下:

1  消息定长

2  在包尾增加回车换行符进行分割,例如FTP协议

3  message分为消息头和消息体

4  更复杂的应用层协议

二、没有考虑TCP粘包/拆包导致功能异常的案例

在功能测试时往往没有问题,但是压力测试过程中,问题就会暴露出来。如果代码没有考虑,往往就会出现解码错位或者错误,导致程序不能工作。

public class TimeServerHandler extends ChannelHandlerAdapter {
    
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length 
            - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + body
            + " ; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
        System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}
public class TimeClientHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;
        for (int i=0;i<100;i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];
        byte.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

服务端运行结果显示只发送了两条请求消息,因而客户端理应也返回了两条“BAD ORDER”应答消息,但是

事实是客户端只返回了一条包含两个“BAD ORDER”指令的消息,说明服务端和客户端都发生了粘包。

三、Netty是如何解决TCP粘包问题的

Netty默认提供了多种编解码器用于处理半包。

支持TCP粘包的TimeServer

public class TimeServer {

    public void bind(int port) throws Exception {
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args!=null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch(NumberFormatException e) {
            }
        }
        new TimeServer().bind(port);
    }
}

public class TimeServerHandler extends ChannelHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body + " ; the counter is : " + ++ counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
            System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

运行结果完全符合预期,说明通过使用LineBasedFrameDecoder和StringDecoder成功解决了TCP粘包导致的读半包问题。

只要将支持半包解码的Handler添加到ChannelPipeline中即可,不需要写额外的代码,用起来很方便。

四、LineBasedFrameDecoder和StringDecoder的原理分析

简单地来说,LineBasedFrameDecoder是以换行符为结束标志的解码器,同事支持配置当行的最大长度。如果连续读取到最大长度house仍然没有发现换行符,就会抛出异常,同时忽略之前读到的异常码流。

而StringDecoder是将接收到的对象转换成字符串,然后继续调用后面的Handler。


推荐阅读
  • 本文介绍了如何通过C#语言调用动态链接库(DLL)中的函数来实现IC卡的基本操作,包括初始化设备、设置密码模式、获取设备状态等,并详细展示了将TextBox中的数据写入IC卡的具体实现方法。 ... [详细]
  • 在1995年,Simon Plouffe 发现了一种特殊的求和方法来表示某些常数。两年后,Bailey 和 Borwein 在他们的论文中发表了这一发现,这种方法被命名为 Bailey-Borwein-Plouffe (BBP) 公式。该问题要求计算圆周率 π 的第 n 个十六进制数字。 ... [详细]
  • 本文详细介绍了 `org.apache.tinkerpop.gremlin.structure.VertexProperty` 类中的 `key()` 方法,并提供了多个实际应用的代码示例。通过这些示例,读者可以更好地理解该方法在图数据库操作中的具体用途。 ... [详细]
  • JUnit下的测试和suite
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 本文深入探讨了Go语言中的接口型函数,通过实例分析其灵活性和强大功能,帮助开发者更好地理解和运用这一特性。 ... [详细]
  • Java中如何判断一个对象是否为Long类型
    本文介绍了一种在Java中判断对象是否属于Long类型的方法,通过定义一个特定的方法来实现这一功能,该方法能够准确地识别并返回结果。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • 本文将从基础概念入手,详细探讨SpringMVC框架中DispatcherServlet如何通过HandlerMapping进行请求分发,以及其背后的源码实现细节。 ... [详细]
  • 深入理解:AJAX学习指南
    本文详细探讨了AJAX的基本概念、工作原理及其在现代Web开发中的应用,旨在为初学者提供全面的学习资料。 ... [详细]
  • 本文详细介绍了如何利用 Bootstrap Table 实现数据展示与操作,包括数据加载、表格配置及前后端交互等关键步骤。 ... [详细]
  • HTML:  将文件拖拽到此区域 ... [详细]
  • 本文通过C++语言实现了一个递归算法,用于解析并计算数学表达式的值。该算法能够处理加法、减法、乘法和除法操作。 ... [详细]
  • 字符串中特定模式出现次数的计算方法
    本文详细探讨了如何高效地计算字符串中特定模式(如'pat')的出现次数,通过实例分析与算法解析,帮助读者掌握解决此类问题的方法。 ... [详细]
  • Requests库的基本使用方法
    本文介绍了Python中Requests库的基础用法,包括如何安装、GET和POST请求的实现、如何处理Cookies和Headers,以及如何解析JSON响应。相比urllib库,Requests库提供了更为简洁高效的接口来处理HTTP请求。 ... [详细]
  • 入门指南:使用FastRPC技术连接Qualcomm Hexagon DSP
    本文旨在为初学者提供关于如何使用FastRPC技术连接Qualcomm Hexagon DSP的基础知识。FastRPC技术允许开发者在本地客户端实现远程调用,从而简化Hexagon DSP的开发和调试过程。 ... [详细]
author-avatar
劲吻2502877607
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有