热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Java]高并发框架Netty简介

文章目录Netty简介NIOEvent事件线程模型Pipeline模型零拷贝与ByteBuf示例JavaNIO服务端Netty服务端Netty客户端Netty是一款异步的事件驱动的

文章目录

  • Netty简介
    • NIO
    • Event事件
    • 线程模型
    • Pipeline模型
    • 零拷贝与ByteBuf
  • 示例
    • Java NIO服务端
    • Netty服务端
    • Netty客户端



Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。
Netty简介

在这里插入图片描述

NIO

NIO(Non Blocking IO):非阻塞IO;是Java1.4中引入的。提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现,都支持阻塞和非阻塞两种模式。

NIO通过设定套接字为非阻塞模式(configureBlocking(false)),然后借助Selector选择器来实现。他使用了事件通知API以确定在一组非阻塞套接字中有哪些已就绪能够进行的I/O操作。

多路复用器Selector是Java NIO编程的基础,多路复用器提供选择已经就绪的任务的能力,简单的说,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

Event事件

Netty 使用不同的事件来通知状态的改变或者是操作的状态。事件可能包括:

  • 连接已被激活或者连接失活
  • 数据读取;
  • 用户事件;
  • 错误事件。
  • 打开或者关闭到远程节点的连接;
  • 将数据写到或者冲刷到套接字。

线程模型

Netty 主要基于主从 Reactors 多线程模型做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:

  • MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。
  • SubReactor 负责相应通道的 IO 读写请求。
  • 非IO请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。

在这里插入图片描述

Netty线程模型的基石是建立在EventLoop上的,其采用协同设计用于处理并发和网络。

  • 服务端启动时,通常会有两个NioEventLoopGroup:一个是监听线程组,主要是监听客户端请求,另一个是工作线程组,主要是处理与客户端的数据通讯。
  • 客户端只有一个NioEventLoopGroup,就是用来处理与服务端通信的线程组。
  • NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

Pipeline模型

Netty的Pipeline模型用的是责任链设计模式,当boss线程监控到绑定端口上有accept事件:

  • 为该socket连接实例化Pipeline,并将InboundHandler和OutboundHandler按序加载到Pipeline中;
  • 然后将该socket连接(也就是Channel对象)挂载到selector上。

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有read或write事件,然后通过线程池去执行Pipeline的业务流。selector如何查询哪些socket连接有read或write事件,主要取决于调用操作系统的哪种IO多路复用内核:

  • 如果是select(注意,此处的select是指操作系统内核的select IO多路复用,不是Netty的seletor对象),那么将会遍历所有socket连接,依次询问是否有read或write事件,最终操作系统内核将所有IO事件的socket连接返回给Netty进程;当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝。
  • 如果是epoll,性能将会大幅提升,因为他基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗。

Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是head和tail,当selector轮询到socket有read事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandler的ChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler。

ChannelHandler分为InbounHandler和OutboundHandler,InboundHandler用来处理接收消息,OutboundHandler用来处理发送消息。head的ChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socket的read和write。tail的ChannelHandler只是InboundHandler,read的Pipleline处理将会最终到达tail。

零拷贝与ByteBuf

Netty使用多路复用技术,提高处理连接的并发性;通过零拷贝更多提高性能:

  • Netty的接收和发送数据采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝;
  • Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象进行一次操作;
  • Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题;
  • 内存池:为了减少堆外直接内存的分配和回收产生的资源损耗问题,Netty提供了基于内存池的缓冲区重用机制;
  • 使用主从Reactor多线程模型,提高并发性;
  • 采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降;

ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便对整段字节缓存进行读写;也支持get/set,方便对其中每一个字节进行读写。他有三种模式:

  • Heap Buffer 堆缓冲区:是ByteBuf最常用的模式,他将数据存储在堆空间。
  • Direct Buffer 直接缓冲区:内存分配都不发生在堆,jvm通过本地方法调用分配内存,这样做有两个好处:
    • 通过免去中间交换的内存拷贝,提升IO处理速度;
    • 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响。
  • Composite Buffer 复合缓冲区:相当于多个不同ByteBuf的视图。

示例

本节给出一个传统NIO服务端,以及使用Netty实现的服务端与客户端聊天程序示例。

Java NIO服务端

通过ServerSocketChannel可以做服务端侦听;为使其非阻塞,需要设定Blocking为false。对于每一个Channel,都注册到Selector,在有事件到达时,会触发。

在遍历事件集合时,对于处理过的元素要移除,避免被再次触发。

public class NioSelectorServer {public static void main(String[] args) throws IOException {ServerSocketChannel srvChannel &#61; ServerSocketChannel.open();srvChannel.socket().bind(new InetSocketAddress(9000));srvChannel.configureBlocking(false);Selector select &#61; Selector.open();SelectionKey selKey &#61; srvChannel.register(select, SelectionKey.OP_ACCEPT);System.out.println("Server started at: " &#43; srvChannel.getLocalAddress());while (true){select.select();Set<SelectionKey> setKey &#61; select.selectedKeys();for (Iterator<SelectionKey> it&#61;setKey.iterator(); it.hasNext();){SelectionKey key &#61; it.next();if(key.isAcceptable()){acceptClient(select, key);}else if(key.isReadable()) {handleClient(key);}it.remove();}}}private static void acceptClient(Selector select, SelectionKey key) throws IOException {ServerSocketChannel srvChannel &#61; (ServerSocketChannel)key.channel();SocketChannel socketChannel &#61; srvChannel.accept();socketChannel.configureBlocking(false);SelectionKey selKey &#61; socketChannel.register(select, SelectionKey.OP_READ);System.out.println("Client connected: " &#43; socketChannel.getRemoteAddress());}private static void handleClient(SelectionKey key) throws IOException {SocketChannel socketChannel &#61; (SocketChannel)key.channel();ByteBuffer buf &#61; ByteBuffer.allocate(100);int nLen &#61; socketChannel.read(buf);if(nLen>0){System.out.println("Received: " &#43; new String(buf.array()));}else if(nLen&#61;&#61;-1){ // client closedSystem.out.println("Client closed: " &#43; socketChannel.getRemoteAddress());socketChannel.close();}}
}

Netty服务端

Netty服务端有固定的流程&#xff1a;创建主线程池以及工作线程池&#xff0c;然后设定channel对应类&#xff08;阻塞、非阻塞&#xff0c;及其他&#xff09;&#xff0c;以及添加channelHandler。

因只收发字符串串&#xff0c;编解码就使用Netty的StringDecoder与StringEncoder即可。

public class ChatServer {public static void main(String[] args) {EventLoopGroup bossGroup &#61; new NioEventLoopGroup(1);EventLoopGroup workGroup &#61; new NioEventLoopGroup(4);try {ServerBootstrap bootstrap &#61; new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {&#64;Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline &#61; ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new ChatServerHandler());}});ChannelFuture cf &#61; bootstrap.bind(9001).sync();out.println("Server started at: " &#43; cf.channel().localAddress());// wait for closecf.channel().closeFuture().sync();} catch (Exception ex) {ex.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}out.println("Chat server shutdown");}
}

在聊天服务端中&#xff0c;执行继承SimpleChannelInboundHandler即可&#xff1a;

  • channelRead0&#xff1a;Simple中最重要的方法&#xff0c;返回指定类型的接收值&#xff1b;
  • channelActive&#xff1a;当有连接到来时触发&#xff1b;
  • channelInactive&#xff1a;有连接断开时触发&#xff1b;
  • channelReadComplete&#xff1a;读取完成时触发&#xff08;可以在里面做一些Flush等操作&#xff09;&#xff1b;
  • exceptionCaught&#xff1a;有异常时触发&#xff0c;此时可通过ctx.close()关闭连接&#xff1b;

import static java.lang.System.out;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {private static ChannelGroup chGroup &#61; new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);&#64;Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel ch &#61; ctx.channel();chGroup.writeAndFlush("[Client]" &#43; ch.localAddress() &#43; " online " &#43; LocalDateTime.now());chGroup.add(ch);out.println("Connect from: " &#43; ch.remoteAddress());}&#64;Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel ch &#61; ctx.channel();chGroup.writeAndFlush("[Client]" &#43; ch.localAddress() &#43; " offline " &#43; LocalDateTime.now());out.println("Client closed: " &#43; ch.remoteAddress());}&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {Channel from &#61; ctx.channel();chGroup.forEach(ch -> {if (ch &#61;&#61; from) {ch.writeAndFlush("[Self]: " &#43; msg);} else {ch.writeAndFlush(from.remoteAddress() &#43; ": " &#43; msg);}});}&#64;Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {out.println("read complete");}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {out.println(ctx.channel().remoteAddress() &#43; cause.getMessage());ctx.close();}
}

Netty客户端

客户端通过Bootstrap初始化线程池&#xff0c;设定ChannelHandler&#xff0c;并发起连接&#xff1b;

通过Scanner获取输入&#xff0c;并发送到服务端。

public class ChatClient {public static void main(String[] args) {EventLoopGroup group &#61; new NioEventLoopGroup();try {Bootstrap bootstrap &#61; new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {&#64;Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline &#61; ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new ChatClientHandler());}});ChannelFuture cf &#61; bootstrap.connect("localhost", 9001).sync();Channel ch &#61; cf.channel();out.println("Connected: " &#43; ch.localAddress() &#43; " -> " &#43; ch.remoteAddress());// read/writeScanner scan &#61; new Scanner(System.in);out.print("Msg to Send: ");while (ch.isActive() && scan.hasNext()) {String msg &#61; scan.nextLine();if ("quit".equals(msg))break;ch.writeAndFlush(msg);}} catch (Exception ex) {ex.printStackTrace();} finally {group.shutdownGracefully();}}
}

客户端Handler非常简单&#xff0c;只需输出接收到内容&#xff0c;并处理异常即可&#xff1a;

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {out.println(msg);}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {out.println(ctx.channel().remoteAddress() &#43; cause.getMessage());ctx.close();}
}


推荐阅读
author-avatar
嘻嘻520000000
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有