热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

1.Netty准备知识:JavaNIO

前言:我们知道,Netty是基于NIO开发的一套框架,在学习Netty之前,我们先学习下JavaNIO。一、IO多路复用模型IO多路复用模型使用了Reactor设计模式,主要有三种

前言:我们知道,Netty是基于NIO开发的一套框架,在学习Netty之前,我们先学习下Java NIO。

一、IO多路复用模型

  IO多路复用模型使用了Reactor设计模式,主要有三种实现:Reacotr单线程、Reactor多线程、Reactor主从模式。

1. Reactor单线程

  在Reactor单线程模式中,所有客户端的请求处理都交给一个线程,串行化处理,效率较低。

技术图片

2. Reactor多线程

  在Reactor多线程模式中,acceptor线程负责接受客户端请求并将请求处理任务交给线程池,提升了请求处理速度。但是当client数量过多时,单线程就无法同时处理那么多的请求,造成瓶颈问题。

技术图片

3. Reactor主从模式

  为了解决Reactor多线程请求转发瓶颈问题,Reactor主从模式将acceptor设计为线程池,用以处理客户端请求。

技术图片

二、NIO使用示例

1. NIO服务端通讯示例

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

public class MultiplexerTimeServer implements Runnable {
    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;//保证线程可见(volitile关键字)

    /**
     * 初始化多路复用器、绑定监听端口
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();//创建多路复用器
            servChannel = ServerSocketChannel.open();//打开ServerSocketChannel,用于监听客户端链接
            servChannel.configureBlocking(false);//设置非阻塞
            servChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定端口
            servChannel.register(selector, SelectionKey.OP_ACCEPT);//注册监听(监听ACCEPT事件)
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    /**
     * 无限轮询准备就绪的key,并对其进行处理
     */
    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : " + body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new java.util.Date(System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes <0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}

服务端通讯序列图如下:

技术图片

2. 客户端通讯示例

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
    }
}

public class TimeClientHandle implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();// 创建多路复用器
            socketChannel = SocketChannel.open();// 打开SocketChannel
            socketChannel.configureBlocking(false);// 设置非阻塞模式
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    /**
     * 创建连接,无线循环准备好的key并对其进行处理
     */
    @Override
    public void run() {
        try {
            // 创建连接
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);//注册READ事件
                    doWrite(sc);
                } else
                    System.exit(1);// 连接失败,进程退出
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes <0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }
            }
        }
    }

    private void doConnect() throws IOException {
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }
}

客户端通讯序列图如下:

技术图片

三、NIO类库介绍

1. 缓冲区Buffer

  Buffer是一个对象,它包含一些要写入或读出的数据。在NIO中,所有的数据都是在缓冲区处理的,读数据时,直接读进缓冲区,写数据时,直接写在缓冲区。

  Buffer类库继承关系:

技术图片

我们最常用的就是ByteBuffer,这里介绍下ByteBuffer的几个参数及使用方法:

capacity 数组容量,创建后不可变
limit 当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度
position 位置,下一次要被读或被写的位置
mark 标记,调用mark()来设置mark=position,调用reset()可以让position恢复到标记位置
clear() 令position=0;limit=capacity;mark=-1;  但是不清除byte数组内容
reset() 把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方
flip() 令limit=position;position=0
allocate(int capavity) 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器
allocateDirect(int capacity) 在堆外内存中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器
wrap(byte[] array) 将byte数组包装成ByteBuffer

tips:ByteBuffer有两个比较重要的实现,HeapByteBuffer 和 DirectByteBuffer。

  HeapByteBuffer:在堆内申请的内存,利于维护

  DirectByteBuffer:在堆外申请的内存,实现零拷贝,提升数据操作速度(外部读取JVM堆中数据是先把JVM数据读到一个内存块中,然后在这个块里读取,使用堆外内存可省去这一步骤)。

2. 通道Channel

特性:

1. 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
2. 通道可以异步地读写。
3. 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
 
重要实现:
FileChannel:从文件中读写数据
DatagramChannel:通过UDP读写网络中的数据,因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入,它发送和接收的是数据包
SocketChannel:通过TCP读写网络中的数据
ServerSocketChannel:监听新进来的TCP连接,对每一个连接创建一个SocketChannel

  示例(这里只举例了FileChannel 和 DatagramChannel,其他两种看上面第二部分):

技术图片技术图片
//FileChannel示例:

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");

FileChannel inChannel = aFile.getChannel();
// 设置1MB的缓冲区

ByteBuffer buf = ByteBuffer.allocate(1024);
// 读取数据到buf中,并返回字节数

int bytesRead = inChannel.read(buf);

while (bytesRead != -1) {

    System.out.println("Read " + bytesRead);

    buf.flip(); // 重设缓冲区 postion = 0 ,limit = 原本position

    while(buf.hasRemaining()){ //缓冲区中是否还有内容
        // 或者使用buf.get(bytes),将数据读进字节数组中

        System.out.print((char) buf.get());

    }

    buf.clear();//清空缓存区
    // 缓冲区只有1MB大小,需要循环读取

    bytesRead = inChannel.read(buf);

}
aFile.close();



//DataGramChannel:

// 打开连接
DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(port));

// buffer接收channel的数据
channel.receive(buf);//如果buffer容不下收到的数据,多出的数据将被抛弃

// 发送数据
channel.send(buf, new InetSocketAddress("baidu.com", 80));

// 连接到特定地址:由于UDP是无连接的,连接到特定的地址并不会像TCP通道那样创建一个真正的连接,而是锁住DatagramChannel,让其只能从特定地址收发数据
channel.connect(new InetSocketAddress("baidu.com", 80));
连接后,可以使用read()和write()方法,但数据传送无保证
channel.read(buf);
channel.write(buf);
Channel示例

3. 多路复用器Selector

  Selector不断轮询注册在其上的Channel,如果某个Channel上有新的TCP连接、读、写操作,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey就可以获取到就绪Channel集合,进行后续的IO操作。

  一些方法:

// 1. 创建Selector
Selector selector = Selector.open();

// 2. 注册通道
channel.configureBlocking(false);//channel必须处于非阻塞状态
SelectionKey key = channel.register(selector, Selector.OP_READ);
   
int select():阻塞到至少有一个通道在你注册的事件上就绪了,返回值为多少通道已就绪
int select(long timeout):同上,最长阻塞timeout毫秒
int selectNow():不阻塞,不管什么通道都立即返回
Set selectedKeys = selector.selectedKeys():返回已就绪的通道的SelectedKey

我们获取到的是SelectionKey,该对象中包含了一些有价值的属性:

insterest集合:OP_CONNECT、OP_ACCEPT、OP_READ、OP_WRITE   
    int interestSet = selectionKey.interestOps();
    boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPTboolean isInterestedInCOnnect= interestSet & SelectionKey.OP_CONNECT;
    boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
    boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

ready集合:已准备就绪的操作的集合(你注册了监听connect,那么除了connect其它都是false),在一次选择(Selection)之后,会首先访问这个ready set
     int readySet = selectionKey.readyOps();
     boolean isAccept = selectionKey.isAcceptable();
     boolean isCOnnect= selectionKey.isConnectable();
     boolean isReadable = selectionKey.isReadable();
     boolean isWritable = selectionKey.isWritable();

channel:Channel  channel  = selectionKey.channel();
selector:Selector selector = selectionKey.selector();
附加对象(可选):SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

1. Netty准备知识:Java NIO


推荐阅读
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 本文讲述了作者通过点火测试男友的性格和承受能力,以考验婚姻问题。作者故意不安慰男友并再次点火,观察他的反应。这个行为是善意的玩人,旨在了解男友的性格和避免婚姻问题。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
author-avatar
mobiledu2502853597
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有