Bootstrap引导类前面讲解了Netty整体架构以及类之间的关系,然后下面就先从框架的入口引导类(就是一个帮助类,存放了一些框架启动时必要的属性,例如ChannelFactor
Bootstrap 引导类
前面讲解了Netty整体架构以及类之间的关系,然后下面就先从框架的入口引导类(就是一个帮助类,存放了一些框架启动时必要的属性,例如ChannelFactory、ChannelPipeline、Map options)开始,看看是如何把这些类组装在一起的。
类关系
Bootstrap:封装了一些服务端和客户端引导类公共的逻辑,通过构造函数初始化参数
ServerBootstrap:继承了Bootstrap,组装了服务端所需要的相关的类 (创建的是需要连接的channel tcp/ip)
ClientBootstrap:继承了Bootstrap,组装了客户端所需要的相关的类 (创建的是需要连接的channel tcp/ip)
ConnectionlessBootstrap:继承了Bootstrap,(创建的是无需连接的channel udp/ip)
Bootstrap
该类提供了公共的数据结构,有创建Channel工厂、初始化了ChannelPipeline和ChannelPipelineFactory、存放了配置Channel参数的Map。
图-bootstrap0
两个构造方法,一个是无参的,一个是有ChannelFactory(创建Channel对象的工厂),如果在新建类时没有传入,那么一定需要调用setFactory方法传入(如果已经设置了channel factory那么就抛出IllegalStateException异常)
注意: 默认ChannelPipelineFactory 中的实现就是将pipeline中的ChannelHandler浅拷贝到新的DefaultChannelPipeline中,如果对于服务端接受多个Channel时,若ChannelHandler是有状态的话就会有线程安全问题(因为在没有同步情况下,多个线程操作同个内存资源),就必须要自己实现ChannelPipelineFactory接口来创建ChannelPipeline(这样每次获取pipeline都会重新new一个ChannelHandler)。
Map options 存放配置channel的信息,如果给子channel(有服务端接受客户端的channel)配置需要在key的名字用child前缀 例如:child.keepAlive
实现了ExternalResourceReleasable接口,表示该类支持释放外部资源,就是调用factory.releaseExternalResources()方法
ServerBootstrap
下面看看ServerBootstrap是如何绑定某个端口的,做了哪些操作
下面看bind操作,在某个ip和某个端口上进行监听客户端连接操作
Channel bind():会从options获取 key=localAddress,然后会调用bind(SocketAddress localAddress)方法
Channel bind(SocketAddress localAddress) 该方法最终会调用bindAsync(SocketAddress localAddress)方法,该方法是一个异步的,操作结果并不会马上获取到,当一有结果就会notify 添加的ChannelFutureListener,并唤醒阻塞等待结果的线程。 ChannelFututre中的awaitUninterruptibly方法就是会一直阻塞等待结果
ChannelFuture bindAsync(SocketAddress localAddress) 绑定操作都会调用该方法,具体实现如图:
图-s-bootstrap1
从bindAsync方法中并没有看到去绑定的操作,再认真想下就明白,Netty是一种以事件驱动的框架,所有的逻辑操作都Binder(一个实现ChannelUpstreamHandler类)
内部类Binder相关逻辑:
1、当server side Channel创建的时候(也就是上面当 getFactory().newChannel(pipeline)的时候,在channel具体实现类的构造方法中会通过门面类Channels.channel.getPipeline().sendUpstream(new UpstreamChannelEvent())下发一个Channel打开的事件)
2、binder类中的channelOpen()方法会被调用,然后给Channel中的ChannelConfig 设置pipelineFactory和options参数
3、直接调用Channel 的bind方法,实际上实现还是调用Channels.bind() 方法,下发一个DownstreamChannelEvent channel.getPipeline().sendDownstream(new DownstreamChannelEvent), 当达到ChannelPipeline维护的链表尾部时就会将Event交给ChannelSink进行处理,最终给I/O线程进行绑定操作,若绑定成功 channelFuture.setSuccess() 失败就setFail() 由I/O线程通知ChannelFutureListener。
4、childChannelOpen() 此方法是在子channel(服务端接受的客户端channel)创建的时候触发的
5、exceptionCaught() 处理绑定过程中抛出的异常
private final class Binder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress;
private final Map childOptiOns=
new HashMap();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress) {
this.localAddress = localAddress;
}
//新建server socket channel后 在bind之前调用该方法
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
//设置其Channel中ChannelConfig中的pipeline factory
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map allOptiOns= getOptions();
Map parentOptiOns= new HashMap();
for (Entry e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!"pipelineFactory".equals(e.getKey())) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
//继续下发到下个upstream handler
ctx.sendUpstream(evt);
}
//事件上调用Channels.bind 方法下发一个DownstreamEvent bind事件,成功后异步通过ChannelFuture 进行通知回调
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
//绑定操作完成后回调该方法,成功或失败
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
//绑定本地地址成功 在端口上进行监听
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
}
//当服务端接受客户端连接时,新建子channel时调用
@Override
public void childChannelOpen(
ChannelHandlerContext ctx,
ChildChannelStateEvent e) throws Exception {
// Apply child options.
try {
e.getChildChannel().getConfig().setOptions(childOptions);
} catch (Throwable t) {
//将异常通过pipeline 传递到handelr进行处理
fireExceptionCaught(e.getChildChannel(), t);
}
ctx.sendUpstream(e);
}
//处理异常事件
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
bindFuture.setFailure(e.getCause());
ctx.sendUpstream(e);
}
}
下面为bind的时序图
图-boostrap-bind-seq
ClientBootstrap
初始化客户端连接服务端所需要的对象,和ServerBootstrap类包含的属性基本一样,原理也类似,不过这里connect服务端地址为remoteAddress 还可以在本地localAddress进行监听。
下面一步一步分析,client channel是如何连接上server的,又是如何体现出两个reactor模式(main reactor + sub reactor),同样又如何面向事件驱动的?
该为ClientBootstrap中的connect(SocketAddress remoteAddress,SocketAddress localAddress)方法实现图:
图-client-bootstrap1
从ClientBootstrap类中的connect(SocketAddress remoteAddress)方法中看出,既可以connect到服务端,又可以在本地某个端口进行监听(既是客户端又是服务端),主要看ch.connect方法。
ch.conect()会调用抽象类AbstractChannel(所有具体Channel实现都会继承该类)中的connect方法,如图:
图-client-bootstrap2
实际上是调用了Channels.connect() 方法 如图:
图-client-bootstrap3
发送一个将要去connect服务端的事件,对于Channel主动改变状态或主动发送消息会下发一个Downstream ChannelEvent 下游事件,前面Netty架构分析中说到:在ChannelPipeline中链表尾部开始向前执行ChannelDownstreamHandler至头部时,否会将该最终会交给ChannelSink进行处理,具体代码实现如图:
图-client-bootstrap4
具体看看NioClientSocketPipelineSink中的connect实现
如图:
图-client-bootstrap4-1
对于上图具体实现流程为:
1、先调用 nio channel中的connect方法,该方法是个异步方法,如果connect返回true表示连接成功,若成功进入2 否则进入 3
2、通过NioWorker(封装了nio selector的一个I/O线程)将可读事件注册到selector上
3、没有立刻连接成功,则通过NioClientBoss(main reactor) 注册一个OP_CONNECT事件,如果该连接被服务端接受那么操作系统层面的selector就会唤醒用户中的阻塞的selector线程,并将就绪的Channel和就绪的状态封装到SelectionKey中,则进入4
4、selector监听到至少一个就绪的Channel就会调用NioClientBoss中的process方法,如果是isConnectable则调用connect(Selection k) 进入5
5、调用channel.finishConnect()方法成功返回true后,将Channel的后面的读和写事件放在NioWorker中(sub reactor)(若不调用的话就抛出java.nio.channels.NotYetConnectedException异常 由于该channel是异步的,所以不会阻塞在connect直到有连接,所以需要判断 nio channel.finishConnect()是否完成连接操作,若完成连接则返回true,否则为false, 后面才可以进行读和写)。
步骤2具体代码实现如图:
图-client-bootstrap9
步骤3具体代码实现如图:
图-client-bootstrap5
图-client-bootstrap6
步骤4、5具体代码如图:
图-client-bootstrap7
图-client-bootstrap8
summary:
1、首先连接请求不是直接去调用nio channel connect,而是通过绕了一大圈,首先包装一个connect事件,经过ChannelPipeline中的各个ChannelHandler,处理完后才会进行最终的nio channel connect。
2、由于nio channel是非阻塞的,所以任何操作(连接、读和写)都不会立即返回结果,而是需要通过信号分离器(Selector)进行callback 通知,所以Netty就会将连接(对于server side就是accept)操作用bossCount个线程不过一般就1个Selector处理Channel,将Channel读和写事件就会交给workCount个线程的Selector进行处理这些Channel。
Netty Main Reactor+Sub Reactor的好处是什么?具体看前面讲的Reactor模式
当面对大量并发连接时不会影响现已连接的用户的读和写逻辑,如果是单个线程(一个Selector)来处理时,其他读和写就绪的Channel就会在后面等待前面大量accpet事件的Channel进行处理,如果按现在的线程模型则读和写事件是单独线程处理,互补影响。
瓶颈: java上线程资源是有限的,一个线程上会有很多个Channel,那么在这个线程上的Channel同样也会存在等待问题,最理想情况是一个Channel对应于线程来处理,但是这个又回到了之前阻塞Channel,经过各种系统性能指标分析得到:线程的切换和线程同步、线程占用内存资源(栈空间越大,堆空间会减小)都是会消耗系统资源的,所以综合来说nio 还是大大提高了系统吞吐量和提高了网络传输效率。