热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

吃透Netty源码系列十二之NioSocketChannelUnsafe详细介绍

吃透Netty源码系列十二之NioSocketChannelUnsafe详细介绍简单介绍write(Objectmsg,ChannelPromisepromise)filterOu

吃透Netty源码系列十二之NioSocketChannelUnsafe详细介绍

  • 简单介绍
  • write(Object msg, ChannelPromise promise)
    • filterOutboundMessage(Object msg)
      • AbstractNioChannel的newDirectBuffer(ByteBuf buf)
    • ChannelOutboundBuffer的addMessage
      • ChannelOutboundBuffer的incrementPendingOutboundBytes
      • ChannelOutboundBuffer的setUnwritable
  • flush()
    • ChannelOutboundBuffer的addFlush
    • AbstractNioUnsafe的flush0
      • AbstractNioUnsafe的isFlushPending
    • AbstractUnsafe的flush0
      • NioSocketChannel的doWrite(ChannelOutboundBuffer in)


简单介绍

这个类是比较核心的类,这个类定义了很多关键的操作,算比较底层的了,其实以前我们有讲过他的两个子类,一个就是AbstractNioMessageChannelNioMessageUnsafe,这个是用于NioServerSocketChannelunsafe类,另外一个AbstractNioByteChannelNioByteUnsafe,用于NioSocketChannel,其实也就是一些读取方法不一样,毕竟一个是取接受连接,一个是读取数据,不一样。我们前面其实有分析过一些他们的read方法。今天主要来分析下NioSocketChannelUnsafewriteflush方法,。
先看下这个NioSocketChannelUnsafe的结构,他是专门用来操作写和刷新的:
在这里插入图片描述

write(Object msg, ChannelPromise promise)

会将数据msg封装成直接缓冲区,然后添加到出站缓冲区中:

&#64;Overridepublic final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer &#61; this.outboundBuffer;if (outboundBuffer &#61;&#61; null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See https://github.com/netty/netty/issues/2362safeSetFailure(promise, newClosedChannelException(initialCloseCause));// release message now to prevent resource-leakReferenceCountUtil.release(msg);return;}int size;try {msg &#61; filterOutboundMessage(msg);//封装成直接缓冲区size &#61; pipeline.estimatorHandle().size(msg);//获取缓冲区大小if (size < 0) {size &#61; 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, size, promise);//往出站缓冲区添加消息}

filterOutboundMessage(Object msg)

这个是专门把数据封装成直接缓冲区&#xff0c;以便于进行零拷贝&#xff0c;利用堆外内存&#xff0c;提高效率&#xff0c;关于零拷贝&#xff0c;可以看看这篇文章&#xff0c;其实这里可以立即为没有用CPU将数据从内核拷贝到Java虚拟机中&#xff1a;

&#64;Overrideprotected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf &#61; (ByteBuf) msg;if (buf.isDirect()) {//如果是直接缓冲区就返回return msg;}return newDirectBuffer(buf);//否则封装成直接缓冲区就可以零拷贝}if (msg instanceof FileRegion) {//文件缓冲区也可以零拷贝return msg;}
//剩下的就不支持了throw new UnsupportedOperationException("unsupported message type: " &#43; StringUtil.simpleClassName(msg) &#43; EXPECTED_TYPES);}

AbstractNioChannel的newDirectBuffer(ByteBuf buf)

其实就是获得一个新的直接缓冲区&#xff0c;把旧的缓冲区释放了&#xff1a;

protected final ByteBuf newDirectBuffer(ByteBuf buf) {final int readableBytes &#61; buf.readableBytes();if (readableBytes &#61;&#61; 0) {//如果没有数据&#xff0c;就释放&#xff0c;返回一个空的ReferenceCountUtil.safeRelease(buf);return Unpooled.EMPTY_BUFFER;}
//字节缓冲区分配器final ByteBufAllocator alloc &#61; alloc();if (alloc.isDirectBufferPooled()) {//是直接缓冲区池化的ByteBuf directBuf &#61; alloc.directBuffer(readableBytes);//申请直接缓冲区directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);//写入直接缓冲区ReferenceCountUtil.safeRelease(buf);//释放原来的缓冲区return directBuf;//返回直接缓冲区}final ByteBuf directBuf &#61; ByteBufUtil.threadLocalDirectBuffer();//线程中的直接缓冲区if (directBuf !&#61; null) {directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);ReferenceCountUtil.safeRelease(buf);return directBuf;}// Allocating and deallocating an unpooled direct buffer is very expensive; give up.return buf;//如果申请或者释放未池化的直接缓冲区消耗太大&#xff0c;就直接返回原来的}

ChannelOutboundBuffer的addMessage

这里才是将直接缓冲区添加到出站缓冲区中&#xff0c;不过是会创建一个实体Entry&#xff0c;然后用一个单链表结构来存取的&#xff0c;这个后面会讲&#xff1a;

public void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry &#61; Entry.newInstance(msg, size, total(msg), promise);//创建实体if (tailEntry &#61;&#61; null) {flushedEntry &#61; null;} else {Entry tail &#61; tailEntry;tail.next &#61; entry;}tailEntry &#61; entry;if (unflushedEntry &#61;&#61; null) {unflushedEntry &#61; entry;//指向第一个未冲刷的实体}// increment pending bytes after adding message to the unflushed arrays.// See https://github.com/netty/netty/issues/1619incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息}

链表结构大致是这样&#xff1a;
在这里插入图片描述

ChannelOutboundBuffer的incrementPendingOutboundBytes

这个就是增加待出站的字节数&#xff0c;如果超过上限的话&#xff0c;就设置不可以写了&#xff1a;

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {if (size &#61;&#61; 0) {return;}long newWriteBufferSize &#61; TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小setUnwritable(invokeLater);//设置不可写}}

ChannelOutboundBuffer的setUnwritable

我们先来看看可写是怎么判断的&#xff1a;

public boolean isWritable() {return unwritable &#61;&#61; 0;}

然后设置不可写就是这样&#xff0c;将unwritable 原子操作改为非0&#xff0c;然后触发fireChannelWritabilityChanged&#xff0c;也就是写能力改变了&#xff0c;不可写了&#xff1a;

private void setUnwritable(boolean invokeLater) {for (;;) {final int oldValue &#61; unwritable;//可写的时候是0final int newValue &#61; oldValue | 1;if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue &#61;&#61; 0 && newValue !&#61; 0) {//开始是0&#xff0c;后来变为非0&#xff0c;就是不可写了fireChannelWritabilityChanged(invokeLater);}break;}}}

如果是立即改变&#xff0c;就会调用pipeline.fireChannelWritabilityChanged();&#xff0c;就会从头结点开始传递这个事件&#xff0c;否则就给通道的事件循环提交个任务&#xff1a;

private void fireChannelWritabilityChanged(boolean invokeLater) {final ChannelPipeline pipeline &#61; channel.pipeline();if (invokeLater) {Runnable task &#61; fireChannelWritabilityChangedTask;if (task &#61;&#61; null) {fireChannelWritabilityChangedTask &#61; task &#61; new Runnable() {&#64;Overridepublic void run() {pipeline.fireChannelWritabilityChanged();}};}channel.eventLoop().execute(task);} else {pipeline.fireChannelWritabilityChanged();}}

至此&#xff0c;write写数据就完成了&#xff0c;其实就是写入出站缓冲区里面&#xff0c;并没有将数据冲刷到对端&#xff0c;要进行flush才会将数据发出去。

flush()

可以看到这里才会开始冲刷&#xff0c;会先进行打标记&#xff0c;然后冲刷&#xff1a;

&#64;Overridepublic final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer &#61; this.outboundBuffer;//获得出站缓冲区if (outboundBuffer &#61;&#61; null) {return;}outboundBuffer.addFlush();//添加冲刷计数flush0();//冲刷}

ChannelOutboundBuffer的addFlush

这里会将flushedEntry设置为要冲刷的第一个entry&#xff0c;然后遍历链表&#xff0c;冲刷计数flushed&#43;1&#xff0c;如果此时请求取消的话&#xff0c;就进行取消和出站字节数的减少&#xff0c;最后将为冲刷实体unflushedEntry设为空&#xff0c;表示这些都已经要冲刷的了&#xff0c;后续会根据flushed来进行冲刷&#xff1b;

public void addFlush() {Entry entry &#61; unflushedEntry;//第一个没冲刷的数据&#xff0c;也是链表的第一个if (entry !&#61; null) {//有数据才刷了if (flushedEntry &#61;&#61; null) {// there is no flushedEntry yet, so start with the entryflushedEntry &#61; entry;//设置第一个要冲刷的实体}do {flushed &#43;&#43;;//冲刷数&#43;1if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存// Was cancelled so make sure we free up memory and notify about the freed bytesint pending &#61; entry.cancel();decrementPendingOutboundBytes(pending, false, true);}entry &#61; entry.next;} while (entry !&#61; null);//遍历冲刷是否有取消的// All flushed so reset unflushedEntryunflushedEntry &#61; null;//重置未冲刷的}}

AbstractNioUnsafe的flush0

要冲刷了&#xff1a;

&#64;Overrideprotected final void flush0() {if (!isFlushPending()) {//没有待冲刷的操作super.flush0();}}

AbstractNioUnsafe的isFlushPending

先判断下是否已经有待冲刷存在&#xff0c;也就是有设置OP_WRITE事件&#xff1a;

private boolean isFlushPending() {SelectionKey selectionKey &#61; selectionKey();return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) !&#61; 0;}

AbstractUnsafe的flush0

这里就要开始真正的冲刷了,省略了非核心的操作&#xff1a;

protected void flush0() {...doWrite(outboundBuffer);...}

NioSocketChannel的doWrite(ChannelOutboundBuffer in)

最终当然还是封装了NIOSocketChannelwrite方法来进行写数据啦&#xff0c;他会进行16次自旋尝试&#xff0c;来写消息&#xff0c;直到出站缓冲区的数据全部写出去了&#xff0c;然后就clearOpWrite清除OP_WRITE设置&#xff0c;返回&#xff0c;否则要去设置任务是否写操作incompleteWrite&#xff1a;

&#64;Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch &#61; javaChannel();//内部还是用NIO的操作的int writeSpinCount &#61; config().getWriteSpinCount();//写自旋的次数&#xff0c;默认是16次do {if (in.isEmpty()) {// All written so clear OP_WRITEclearOpWrite();//全部写完后&#xff0c;进行写事件的清除// Directly return here so incompleteWrite(...) is not called.return;//直接返回&#xff0c;不需要调用incompleteWrite}// Ensure the pending writes are made of ByteBufs only.int maxBytesPerGatheringWrite &#61; ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//获取最大的待写字节数16384ByteBuffer[] nioBuffers &#61; in.nioBuffers(1024, maxBytesPerGatheringWrite);//获取ByteBuffer数组int nioBufferCnt &#61; in.nioBufferCount();// Always us nioBuffers() to workaround data-corruption.// See https://github.com/netty/netty/issues/2761switch (nioBufferCnt) {case 0:// We have something else beside ByteBuffers to write so fallback to normal writes.writeSpinCount -&#61; doWrite0(in);break;case 1: {// Only one ByteBuf so use non-gathering write 一个就不用gathering// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero. 0字节的不会放进缓冲区里&#xff0c;不用检查ByteBuffer buffer &#61; nioBuffers[0];//只有一个int attemptedBytes &#61; buffer.remaining();final int localWrittenBytes &#61; ch.write(buffer);//用NIO的SocketChannel写出去if (localWrittenBytes <&#61; 0) {incompleteWrite(true);return;}adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}default: {// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.// We limit the max amount to int above so cast is safelong attemptedBytes &#61; in.nioBufferSize();//获取发多少字节数据final long localWrittenBytes &#61; ch.write(nioBuffers, 0, nioBufferCnt);//一起写出去了if (localWrittenBytes <&#61; 0) {//没完成incompleteWrite(true);return;}// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}}} while (writeSpinCount > 0);incompleteWrite(writeSpinCount < 0);}

上面这个方法涉及到的东西比较多&#xff0c;讲起来东西很多&#xff0c;因为还涉及到ChannelOutboundBuffer&#xff0c;还有一些ByteBuf相关的东西等等&#xff0c;下一篇详细的来介绍下这些东西&#xff0c;不然很难讲清楚doWrite

好了&#xff0c;今天就到这里了&#xff0c;希望对学习理解有帮助&#xff0c;大神看见勿喷&#xff0c;仅为自己的学习理解&#xff0c;能力有限&#xff0c;请多包涵。


推荐阅读
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 怀疑是每次都在新建文件,具体代码如下 ... [详细]
  • Oracle seg,V$TEMPSEG_USAGE与Oracle排序的关系及使用方法
    本文介绍了Oracle seg,V$TEMPSEG_USAGE与Oracle排序之间的关系,V$TEMPSEG_USAGE是V_$SORT_USAGE的同义词,通过查询dba_objects和dba_synonyms视图可以了解到它们的详细信息。同时,还探讨了V$TEMPSEG_USAGE的使用方法。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • 本文介绍了MVP架构模式及其在国庆技术博客中的应用。MVP架构模式是一种演变自MVC架构的新模式,其中View和Model之间的通信通过Presenter进行。相比MVC架构,MVP架构将交互逻辑放在Presenter内部,而View直接从Model中读取数据而不是通过Controller。本文还探讨了MVP架构在国庆技术博客中的具体应用。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 在C#中,使用关键字abstract来定义抽象类和抽象方法。抽象类是一种不能被实例化的类,它只提供部分实现,但可以被其他类继承并创建实例。抽象类可以用于类、方法、属性、索引器和事件。在一个类声明中使用abstract表示该类倾向于作为其他类的基类成员被标识为抽象,或者被包含在一个抽象类中,必须由其派生类实现。本文介绍了C#中抽象类和抽象方法的基础知识,并提供了一个示例代码。 ... [详细]
  • PHP反射API的功能和用途详解
    本文详细介绍了PHP反射API的功能和用途,包括动态获取信息和调用对象方法的功能,以及自动加载插件、生成文档、扩充PHP语言等用途。通过反射API,可以获取类的元数据,创建类的实例,调用方法,传递参数,动态调用类的静态方法等。PHP反射API是一种内建的OOP技术扩展,通过使用Reflection、ReflectionClass和ReflectionMethod等类,可以帮助我们分析其他类、接口、方法、属性和扩展。 ... [详细]
  • php缓存ri,浅析ThinkPHP缓存之快速缓存(F方法)和动态缓存(S方法)(日常整理)
    thinkPHP的F方法只能用于缓存简单数据类型,不支持有效期和缓存对象。S()缓存方法支持有效期,又称动态缓存方法。本文是小编日常整理有关thinkp ... [详细]
author-avatar
mobiledu2502930997
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有