热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

吃透Netty源码系列五十八之Unsafe和AbstractUnsafe

吃透Netty源码系列五十八之Unsafe和AbstractUnsafeNetty的Unsafe接口AbstractUnsafe基本抽象实现属性recvBufAllocHandle

吃透Netty源码系列五十八之Unsafe和AbstractUnsafe

  • Netty的Unsafe接口
    • AbstractUnsafe基本抽象实现
      • 属性
      • recvBufAllocHandle接受缓冲区处理器
      • register注册到事件循环
        • register0执行注册逻辑
        • bind绑定地址
        • disconnect断开连接
        • close关闭通道和出站缓冲区
          • doClose0关闭通道
          • fireChannelInactiveAndDeregister传递通道失效和注销事件
          • doDeregister注销事件
        • shutdownOutput出站缓冲区关闭处理


Netty的Unsafe接口

这个Unsafe可不是JDK原生的Unsafe哦,主要就是一些直接跟IO底层直接相关的通用操作:

interface Unsafe {// 接受数据的时候用于分配字节缓冲区的处理器RecvByteBufAllocator.Handle recvBufAllocHandle();// 本地地址SocketAddress localAddress();// 远程地址SocketAddress remoteAddress();//向事件循环注册通道,完成后回调void register(EventLoop eventLoop, ChannelPromise promise);// 绑定本地地址,完成后回调void bind(SocketAddress localAddress, ChannelPromise promise);// 连接void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);// 断开连接,完成回调void disconnect(ChannelPromise promise);// 关闭连接,完成回调void close(ChannelPromise promise);// 立即关闭,不触发任何事件void closeForcibly();// 注销,完成回调void deregister(ChannelPromise promise);// 开始读操作void beginRead();// 写操作void write(Object msg, ChannelPromise promise);// 冲刷所有的出站数据void flush();// 特殊的占位符,不接受通知ChannelPromise voidPromise();//写操作的出站缓冲区ChannelOutboundBuffer outboundBuffer();}

AbstractUnsafe基本抽象实现


属性

这些就是一些基本的属性,要进行数据的读写,需要有接收缓冲区,所以有了recvHandle处理器,写出去的时候需要有写缓冲区ChannelOutboundBuffer ,注意ChannelOutboundBuffer是初始化的时候就会创建,就创建一次。

//出站字节缓冲区private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);private RecvByteBufAllocator.Handle recvHandle;//接受数据缓冲分配器的处理器private boolean inFlush0;//是否正在缓冲/** true if the channel has never been registered, false otherwise */private boolean neverRegistered = true;//通道没注册过private void assertEventLoop() {//断言还没注册,或者当前线程是IO线程assert !registered || eventLoop.inEventLoop();}

recvBufAllocHandle接受缓冲区处理器

缓冲区分配上次说过了,出站缓冲区以前文章也有讲过。

@Overridepublic RecvByteBufAllocator.Handle recvBufAllocHandle() {if (recvHandle == null) {recvHandle = config().getRecvByteBufAllocator().newHandle();}return recvHandle;}

register注册到事件循环

注册方法其实就是判断是否当前线程就是IO线程,是的话就直接执行,不是就包装成一个任务提交给IO线程,这样就避免多线程的问题,始终是单线程操作。

@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) {//是否已经注册人到一个eventLooppromise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {//是否是NioEventLoop类型promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;
//只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里if (eventLoop.inEventLoop()) {register0(promise);} else {//否则就当做任务提交给eventLoop的任务队列try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}

register0执行注册逻辑

这里是注册过程要做的事,进行真正的注册逻辑doRegister,其实就是将NIO通道注册到Selector上,然后进行处理器的待添加事件的处理,注册回调成功,管道传递注册事件,如果是第一次注册,管道传递通道激活事件,否则是设置自动读的话就注册读监听。

private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {//确保是不可取消和通道打开着,否则就返回return;}boolean firstRegistration = neverRegistered;//设置注册标记doRegister();//进行注册逻辑neverRegistered = false;//AbstractUnsafe的已注册标记registered = true;//channel的已注册标记pipeline.invokeHandlerAddedIfNeeded();//如果在注册前有处理器添加,还没进行HandlerAdded回调,注册成功后要回调safeSetSuccess(promise);//回调注册成功pipeline.fireChannelRegistered();//通道注册事件传递if (isActive()) {//通道激活的话if (firstRegistration) {//第一次注册要进行激活事件传递pipeline.fireChannelActive();} else if (config().isAutoRead()) {//否则如果设置了自动读,就进行读监听beginRead();}}} catch (Throwable t) {closeForcibly();//强制关闭closeFuture.setClosed();//关闭回调safeSetFailure(promise, t);//设置失败}}

bind绑定地址

省略了部分。看主逻辑,做具体的doBind,如果通道开始没激活,绑定后激活的话,就开一个延时的任务,进行激活事件传递,最后回调绑定成功。

@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {...boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {//绑定前没激活,绑定后激活了invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}

disconnect断开连接

调用doDisconnect,断开连接,如果开始激活的,断开后失效了,就传递失效事件。如果通道关闭了,还要处理关闭事件closeIfClosed

@Overridepublic final void disconnect(final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable()) {return;}boolean wasActive = isActive();try {doDisconnect();// Reset remoteAddress and localAddressremoteAddress = null;localAddress = null;} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (wasActive && !isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelInactive();}});}safeSetSuccess(promise);closeIfClosed(); // doDisconnect() might have closed the channel}

close关闭通道和出站缓冲区

进行通道的关闭,主要还是出站缓冲区的处理和传递通道失效和注销事件。

@Overridepublic final void close(final ChannelPromise promise) {assertEventLoop();ClosedChannelException closedChannelException = new ClosedChannelException();close(promise, closedChannelException, closedChannelException, false);}private void close(final ChannelPromise promise, final Throwable cause,final ClosedChannelException closeCause, final boolean notify) {if (!promise.setUncancellable()) {return;}if (closeInitiated) {//如果已经发起关闭了if (closeFuture.isDone()) {//判断是否关闭完成// Closed already.safeSetSuccess(promise);//回调} else if (!(promise instanceof VoidChannelPromise)) { closeFuture.addListener(new ChannelFutureListener() {//如果不是VoidChannelPromise,添加关闭监听@Overridepublic void operationComplete(ChannelFuture future) throws Exception {promise.setSuccess();}});}return;}closeInitiated = true;//已经开始关闭了//处理出站缓冲区关闭final boolean wasActive = isActive();final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.Executor closeExecutor = prepareToClose();if (closeExecutor != null) {closeExecutor.execute(new Runnable() {@Overridepublic void run() {try {// Execute the close.doClose0(promise);} finally {invokeLater(new Runnable() {@Overridepublic void run() {if (outboundBuffer != null) {// Fail all the queued messagesoutboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}fireChannelInactiveAndDeregister(wasActive);}});}}});} else {try {doClose0(promise);} finally {if (outboundBuffer != null) {// Fail all the queued messages.outboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}}if (inFlush0) {invokeLater(new Runnable() {@Overridepublic void run() {fireChannelInactiveAndDeregister(wasActive);}});} else {fireChannelInactiveAndDeregister(wasActive);}}}

doClose0关闭通道

具体的关闭逻辑和回调,具体逻辑是在通道中实现的,后面会讲。

private void doClose0(ChannelPromise promise) {try {doClose();closeFuture.setClosed();safeSetSuccess(promise);} catch (Throwable t) {closeFuture.setClosed();safeSetFailure(promise, t);}}

fireChannelInactiveAndDeregister传递通道失效和注销事件

传递通道失效和注销事件。

private void fireChannelInactiveAndDeregister(final boolean wasActive) {deregister(voidPromise(), wasActive && !isActive());}

doDeregister注销事件

提交一个任务,进行注销doDeregister,然后根据情况传递通道失效和注销事件。

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {if (!promise.setUncancellable()) {return;}if (!registered) {safeSetSuccess(promise);return;}invokeLater(new Runnable() {@Overridepublic void run() {try {doDeregister();} catch (Throwable t) {logger.warn("Unexpected exception occurred while deregistering a channel.", t);} finally {if (fireChannelInactive) {pipeline.fireChannelInactive();}if (registered) {registered = false;pipeline.fireChannelUnregistered();}safeSetSuccess(promise);}}});}

shutdownOutput出站缓冲区关闭处理

清理出站缓冲区ChannelOutboundBuffer ,并传递fireUserEventTriggered事件。

@UnstableApipublic final void shutdownOutput(final ChannelPromise promise) {assertEventLoop();shutdownOutput(promise, null);}private void shutdownOutput(final ChannelPromise promise, Throwable cause) {if (!promise.setUncancellable()) {return;}final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {//如果出站缓冲区为null的话,就回调失败promise.setFailure(new ClosedChannelException());return;}this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.禁止添加数据到出站缓冲区了final Throwable shutdownCause = cause == null ?//根据异常创建ChannelOutputShutdownExceptionnew ChannelOutputShutdownException("Channel output shutdown") :new ChannelOutputShutdownException("Channel output shutdown", cause);Executor closeExecutor = prepareToClose();//有关闭执行器if (closeExecutor != null) {//提交一个任务closeExecutor.execute(new Runnable() {@Overridepublic void run() {try {// Execute the shutdown.doShutdownOutput();promise.setSuccess();} catch (Throwable err) {promise.setFailure(err);} finally {//出站缓冲区事件任务// Dispatch to the EventLoopeventLoop().execute(new Runnable() {@Overridepublic void run() {//出站缓冲区事件处理closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);}});}}});} else {try {//直接处理关闭// Execute the shutdown.doShutdownOutput();promise.setSuccess();} catch (Throwable err) {promise.setFailure(err);} finally {closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);}}}private void closeOutboundBufferForShutdown(ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {buffer.failFlushed(cause, false);//不能冲刷buffer.close(cause, true);//关闭出站缓冲区pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);//传递事件}

好像有点长了,下一篇继续吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


推荐阅读
author-avatar
dnjaskn
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有