NIO(Non Blocking IO):非阻塞IO;是Java1.4中引入的。提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现,都支持阻塞和非阻塞两种模式。
NIO通过设定套接字为非阻塞模式(configureBlocking(false)
),然后借助Selector选择器来实现。他使用了事件通知API以确定在一组非阻塞套接字中有哪些已就绪能够进行的I/O操作。
多路复用器Selector是Java NIO编程的基础,多路复用器提供选择已经就绪的任务的能力,简单的说,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
Netty 使用不同的事件来通知状态的改变或者是操作的状态。事件可能包括:
Netty 主要基于主从 Reactors 多线程模型做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:
Netty线程模型的基石是建立在EventLoop上的,其采用协同设计用于处理并发和网络。
Netty的Pipeline模型用的是责任链设计模式,当boss线程监控到绑定端口上有accept事件:
一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有read或write事件,然后通过线程池去执行Pipeline的业务流。selector如何查询哪些socket连接有read或write事件,主要取决于调用操作系统的哪种IO多路复用内核:
Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是head和tail,当selector轮询到socket有read事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandler的ChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler。
ChannelHandler分为InbounHandler和OutboundHandler,InboundHandler用来处理接收消息,OutboundHandler用来处理发送消息。head的ChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socket的read和write。tail的ChannelHandler只是InboundHandler,read的Pipleline处理将会最终到达tail。
Netty使用多路复用技术,提高处理连接的并发性;通过零拷贝更多提高性能:
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便对整段字节缓存进行读写;也支持get/set,方便对其中每一个字节进行读写。他有三种模式:
本节给出一个传统NIO服务端,以及使用Netty实现的服务端与客户端聊天程序示例。
通过ServerSocketChannel可以做服务端侦听;为使其非阻塞,需要设定Blocking为false。对于每一个Channel,都注册到Selector,在有事件到达时,会触发。
在遍历事件集合时,对于处理过的元素要移除,避免被再次触发。
public class NioSelectorServer {public static void main(String[] args) throws IOException {ServerSocketChannel srvChannel &#61; ServerSocketChannel.open();srvChannel.socket().bind(new InetSocketAddress(9000));srvChannel.configureBlocking(false);Selector select &#61; Selector.open();SelectionKey selKey &#61; srvChannel.register(select, SelectionKey.OP_ACCEPT);System.out.println("Server started at: " &#43; srvChannel.getLocalAddress());while (true){select.select();Set<SelectionKey> setKey &#61; select.selectedKeys();for (Iterator<SelectionKey> it&#61;setKey.iterator(); it.hasNext();){SelectionKey key &#61; it.next();if(key.isAcceptable()){acceptClient(select, key);}else if(key.isReadable()) {handleClient(key);}it.remove();}}}private static void acceptClient(Selector select, SelectionKey key) throws IOException {ServerSocketChannel srvChannel &#61; (ServerSocketChannel)key.channel();SocketChannel socketChannel &#61; srvChannel.accept();socketChannel.configureBlocking(false);SelectionKey selKey &#61; socketChannel.register(select, SelectionKey.OP_READ);System.out.println("Client connected: " &#43; socketChannel.getRemoteAddress());}private static void handleClient(SelectionKey key) throws IOException {SocketChannel socketChannel &#61; (SocketChannel)key.channel();ByteBuffer buf &#61; ByteBuffer.allocate(100);int nLen &#61; socketChannel.read(buf);if(nLen>0){System.out.println("Received: " &#43; new String(buf.array()));}else if(nLen&#61;&#61;-1){ // client closedSystem.out.println("Client closed: " &#43; socketChannel.getRemoteAddress());socketChannel.close();}}
}
Netty服务端有固定的流程&#xff1a;创建主线程池以及工作线程池&#xff0c;然后设定channel对应类&#xff08;阻塞、非阻塞&#xff0c;及其他&#xff09;&#xff0c;以及添加channelHandler。
因只收发字符串串&#xff0c;编解码就使用Netty的StringDecoder与StringEncoder即可。
public class ChatServer {public static void main(String[] args) {EventLoopGroup bossGroup &#61; new NioEventLoopGroup(1);EventLoopGroup workGroup &#61; new NioEventLoopGroup(4);try {ServerBootstrap bootstrap &#61; new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {&#64;Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline &#61; ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new ChatServerHandler());}});ChannelFuture cf &#61; bootstrap.bind(9001).sync();out.println("Server started at: " &#43; cf.channel().localAddress());// wait for closecf.channel().closeFuture().sync();} catch (Exception ex) {ex.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}out.println("Chat server shutdown");}
}
在聊天服务端中&#xff0c;执行继承SimpleChannelInboundHandler
即可&#xff1a;
import static java.lang.System.out;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {private static ChannelGroup chGroup &#61; new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);&#64;Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel ch &#61; ctx.channel();chGroup.writeAndFlush("[Client]" &#43; ch.localAddress() &#43; " online " &#43; LocalDateTime.now());chGroup.add(ch);out.println("Connect from: " &#43; ch.remoteAddress());}&#64;Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel ch &#61; ctx.channel();chGroup.writeAndFlush("[Client]" &#43; ch.localAddress() &#43; " offline " &#43; LocalDateTime.now());out.println("Client closed: " &#43; ch.remoteAddress());}&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {Channel from &#61; ctx.channel();chGroup.forEach(ch -> {if (ch &#61;&#61; from) {ch.writeAndFlush("[Self]: " &#43; msg);} else {ch.writeAndFlush(from.remoteAddress() &#43; ": " &#43; msg);}});}&#64;Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {out.println("read complete");}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {out.println(ctx.channel().remoteAddress() &#43; cause.getMessage());ctx.close();}
}
客户端通过Bootstrap初始化线程池&#xff0c;设定ChannelHandler&#xff0c;并发起连接&#xff1b;
通过Scanner获取输入&#xff0c;并发送到服务端。
public class ChatClient {public static void main(String[] args) {EventLoopGroup group &#61; new NioEventLoopGroup();try {Bootstrap bootstrap &#61; new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {&#64;Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline &#61; ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new ChatClientHandler());}});ChannelFuture cf &#61; bootstrap.connect("localhost", 9001).sync();Channel ch &#61; cf.channel();out.println("Connected: " &#43; ch.localAddress() &#43; " -> " &#43; ch.remoteAddress());// read/writeScanner scan &#61; new Scanner(System.in);out.print("Msg to Send: ");while (ch.isActive() && scan.hasNext()) {String msg &#61; scan.nextLine();if ("quit".equals(msg))break;ch.writeAndFlush(msg);}} catch (Exception ex) {ex.printStackTrace();} finally {group.shutdownGracefully();}}
}
客户端Handler非常简单&#xff0c;只需输出接收到内容&#xff0c;并处理异常即可&#xff1a;
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {out.println(msg);}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {out.println(ctx.channel().remoteAddress() &#43; cause.getMessage());ctx.close();}
}