热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty学习(二):服务端与客户端通信

Netty学习(二):服务端与客户端通信作者:Grey原文地址:博客园:Netty学习(二):服务端与客户端通信CSDN:Netty学习(二):服务端与客户端通信说明Netty中服务端和客户端通信,包括两个方面,一个是IO处理逻辑的配置,一个是通信载

Netty 学习(二):服务端与客户端通信

作者: Grey

博客园:Netty 学习(二):服务端与客户端通信

CSDN:Netty 学习(二):服务端与客户端通信

说明

Netty 中服务端和客户端通信,包括两个方面,一个是 IO 处理逻辑的配置,一个是通信载体的设置。

IO 处理逻辑

无论是客户端,还是服务端,都是通过 Bootstrap 的 handler()方法指定的。我们通过模拟一个简单的客户端发送消息给服务端,服务端回写消息给客户端的示例程序来说明

服务端代码如下(每个配置见注释说明)

package netty.v3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
 * Netty 自动绑定递增端口,增加了IO处理逻辑
 *
 * @author Grey
 * @date 2022/9/12
 * @since
 */
public class NettyServer {

    public static void main(String[] args) {
        // 引导服务端的启动
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 用于监听端口,接收新连接的线程组
        NioEventLoopGroup boss = new NioEventLoopGroup();
        // 表示处理每一个连接的数据读写的线程组
        NioEventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap.group(boss, worker)
                // 指定IO模型为NIO
                .channel(NioServerSocketChannel.class)
                // 可以给服务端的Channel指定一些属性,非必须
                .attr(AttributeKey.newInstance("serverName"), "nettyServer")
                // 可以给每一个连接都指定自定义属性,非必须
                .childAttr(AttributeKey.newInstance("clientKey"), "clientValue")
                // 使用option方法可以定义服务端的一些TCP参数
                // 这个设置表示系统用于临时存放已经完成三次握手的请求的队列的最大长度,
                // 如果连接建立频繁,服务器创建新的连接比较慢,则可以适当调大这个参数
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 以下两个配置用于设置每个连接的TCP参数
                // SO_KEEPALIVE: 表示是否开启TCP底层心跳机制,true表示开启
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // TCP_NODELAY:表示是否开启Nagle算法,true表示关闭,false表示开启
                // 如果要求高实时性,有数据发送时就马上发送,就设置为关闭;
                // 如果需要减少发送次数,减少网络交互,就设置为开启。
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 定义后面每一个连接的数据读写
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            // 不管服务端还是客户端,收到数据后都会调用channelRead()方法
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));
                                // 服务端将读到的数据返回客户端
                                System.out.println(new Date() + ": 服务端写出数据");
                                ctx.channel().writeAndFlush(getByteBuf(ctx));
                            }
                            private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
                                byte[] bytes = "hello world from server!".getBytes(StandardCharsets.UTF_8);
                                ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes(bytes);
                                return buffer;
                            }
                        });
                    }
                });
        // 本地绑定一个8000端口启动服务
        bind(serverBootstrap, 8000);
    }

    public static void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("端口[" + port + "]绑定成功");
            } else {
                System.err.println("端口[" + port + "]绑定失败");
                bind(serverBootstrap, port + 1);
            }
        });
    }
}

客户端代码如下(关于每个配置的说明见注释)

package netty.v3;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * Netty 可自动重连的客户端,增加了IO处理逻辑
 *
 * @author Grey
 * @date 2022/9/12
 * @since
 */
public class NettyClient {
    static final int MAX_RETRY = 6;
    static final String HOST = "localhost";
    static final int PORT = 8000;

    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();

        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap
                // 指定线程模型
                .group(group)
                // 指定IO类型为NIO
                .channel(NioSocketChannel.class)
                // attr可以为客户端Channel绑定自定义属性
                .attr(AttributeKey.newInstance("clientName"), "nettyClient")
                // 连接的超时时间,如果超过这个时间,仍未连接到服务端,则表示连接失败
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                // 表示是否开启TCP底层心跳机制,true表示开启
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 是否开启Nagle算法,如果要求高实时性,有数据就马上发送,则为true
                // 如果需要减少发送次数,减少网络交互,就设置为false
                .option(ChannelOption.TCP_NODELAY, true)
                // IO处理逻辑
                .handler(new ChannelInitializer<>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            // 这个方法会在客户端连接建立成功之后被调用
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) {
                                System.out.println(new Date() + ": 客户端写出数据");
                                // 包装成ByteBuf并发送到服务端
                                // 注:Netty中的数据是以 ByteBuf 为单位的。
                                ctx.channel().writeAndFlush(getByteBuf(ctx));
                            }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(new Date() + ": 客户端读取到的数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));
                            }

                            private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
                                ByteBuf buffer = ctx.alloc().buffer();
                                byte[] bytes = "hello world".getBytes(StandardCharsets.UTF_8);
                                buffer.writeBytes(bytes);
                                return buffer;
                            }
                        });
                    }
                });
        connect(bootstrap, HOST, PORT, MAX_RETRY);
    }

    private static void connect(final Bootstrap bootstrap, final String host, final int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("连接成功!");
            } else if (retry == 0) {
                System.err.println("重试次数已经使用完毕");
            } else {
                // 第几次重试
                int order = (MAX_RETRY - retry) + 1;
                // 本次的重试间隔
                int delay = 1 < connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
    }

}

代码说明:

无论是服务端还是客户端

channelActive方法会在客户端连接建立成功之后被调用。

channelRead方法在收到数据后都会调用

先运行服务端,控制台输出

端口[8000]绑定成功

然后运行客户端,客户端打印

连接成功!
Wed Sep 14 19:58:50 CST 2022: 客户端写出数据
Wed Sep 14 19:58:50 CST 2022: 客户端读取到的数据 -> hello world from server!

服务端打印

端口[8000]绑定成功
Wed Sep 14 19:58:50 CST 2022: 服务端读到数据 -> hello world
Wed Sep 14 19:58:50 CST 2022: 服务端写出数据

数据载体

Netty 中的数据载体是 ByteBuf,ByteBuf 的结构如下

image

ByteBuf 和 java.nio.ByteBuffer 类似,但是提供了比 java.nio.ByteBuffer更方便使用的 API。

关于 Java 的java.nio.ByteBuffer的使用,参考:Java IO学习笔记二:DirectByteBuffer与HeapByteBuffer

接下来使用一个示例来说明 ByteBuf 的使用

代码如下:

package bytebuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
 * ByteBuf 示例
 * @author Grey
 * @date 2022/9/14
 * @since
 */
public class ByteBufTest {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

        print("allocate ByteBuf(9, 100)", buffer);

        // write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        print("writeBytes(1,2,3,4)", buffer);

        // write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写, 写完 int 类型之后,写指针增加4
        buffer.writeInt(12);
        print("writeInt(12)", buffer);

        // write 方法改变写指针, 写完之后写指针等于 capacity 的时候,buffer 不可写
        buffer.writeBytes(new byte[]{5});
        print("writeBytes(5)", buffer);

        // write 方法改变写指针,写的时候发现 buffer 不可写则开始扩容,扩容之后 capacity 随即改变
        buffer.writeBytes(new byte[]{6});
        print("writeBytes(6)", buffer);

        // get 方法不改变读写指针
        System.out.println("getByte(3) return: " + buffer.getByte(3));
        System.out.println("getShort(3) return: " + buffer.getShort(3));
        System.out.println("getInt(3) return: " + buffer.getInt(3));
        print("getByte()", buffer);


        // set 方法不改变读写指针
        buffer.setByte(buffer.readableBytes() + 1, 0);
        print("setByte()", buffer);

        // read 方法改变读指针
        byte[] dst = new byte[buffer.readableBytes()];
        buffer.readBytes(dst);
        print("readBytes(" + dst.length + ")", buffer);

    }

    private static void print(String action, ByteBuf buffer) {
        System.out.println("after ===========" + action + "============");
        System.out.println("capacity(): " + buffer.capacity());
        System.out.println("maxCapacity(): " + buffer.maxCapacity());
        System.out.println("readerIndex(): " + buffer.readerIndex());
        System.out.println("readableBytes(): " + buffer.readableBytes());
        System.out.println("isReadable(): " + buffer.isReadable());
        System.out.println("writerIndex(): " + buffer.writerIndex());
        System.out.println("writableBytes(): " + buffer.writableBytes());
        System.out.println("isWritable(): " + buffer.isWritable());
        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
        System.out.println();
    }
}
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

表示分配了一块ByteBuf。相关的参数值如下图

image

buffer.writeBytes(new byte[]{1, 2, 3, 4});

写入四个 byte 类型的数据,ByteBuf变化如下

image

buffer.writeInt(12);

由于int类型是4个Byte,所以,写入后,buffer变化如下

image

buffer.writeBytes(new byte[]{5});

写入一个byte类型的数据,此时,写空间已经满了。如下图

image

buffer.writeBytes(new byte[]{6});

继续写入,由于写空间已经满了,所以要进行扩容,扩容后的结构如下

image

buffer.getByte(3);
buffer.getShort(3);
buffer.getInt(3);
buffer.setByte(buffer.readableBytes() + 1, 0);

由于get/set操作不改变读写指针,所以buffer还是保持原样

image

byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);

read方法会改变读指针,改变后结构如下

image

图例

本文所有图例见:processon: Netty学习笔记

代码

hello-netty

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码


推荐阅读
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文介绍了使用数据库管理员用户执行onstat -l命令来监控GBase8s数据库的物理日志和逻辑日志的使用情况,并强调了对已使用的逻辑日志是否及时备份的重要性。同时提供了监控方法和注意事项。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
author-avatar
爱你116564
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有