Channel
创建再次回顾一下Channel
的流程吧。
doBind
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
bind2
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
}
initAndRegister
final ChannelFuture initAndRegister() {
...
channel = channelFactory.newChannel();
...
}
newChannel
默认采用的是反射进行实例化。
public T newChannel() {
...
return clazz.newInstance();
}
NioServerSocketChannel
使用的就是传入的
NioServerSocketChannel
类,反射默认调用的是无参构造。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket
private static ServerSocketChannel newSocket(SelectorProvider provider) { .... return provider.openServerSocketChannel(); }
次方法返回的,就是原生的
NIO
的ServerSocketChannel
了
NioServerSocketChannel2
实际调用的构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
此时已经传入
OP_ACCEPT
事件监听
Channel
实例信息NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
super
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
super
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
...
}
可以看到,
NIO
的真实ServerSocketChannel
作为内部对象,被封装了。
super
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
NioServerSocket
的id
和pipeline
此时被创建和添加,具体的创建办法可以继续跟踪。非主要目的,先到此为止。
Channel
配置信息NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
config
之前,先看看javaChannel().socket()
javaChannel
protected ServerSocketChannel javaChannel() { return (ServerSocketChannel) super.javaChannel(); }
super.javaChannel()
protected SelectableChannel javaChannel() { return ch; }
之前已经看到了,创建的
ServerSocketChannel
被赋值给了ch
,这一步就是获取原来的ch
。类型变化?继承关系如下
javaChannel
也就是JDK
原生的channel
了。
javaChannel().socket()
就是原生
ServerSocketChannel
获取socket
对象了,不是新东西。
NioServerSocketChannelConfig
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);
}
super
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
this.javaSocket = javaSocket;
}
super
之后把javaSocket
带过来,旧东西不管,继续super
super
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
记住
AdaptiveRecvByteBufAllocator
,这是需要详细追查的。为了流程完整性,我们继续深入。
DefaultChannelConfig
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
setRecvByteBufAllocator(allocator, channel.metadata());
this.channel = channel;
}
setRecvByteBufAllocator
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
} else if (allocator == null) {
throw new NullPointerException("allocator");
}
setRecvByteBufAllocator(allocator);
}
setRecvByteBufAllocator
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
rcvBufAllocator = checkNotNull(allocator, "allocator");
return this;
}
也就是说,我们配置最关键的,就是DefaultChannelConfig新增的额外参数,
allocator
。
Allocator
前面已经说了,这个就是重点。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
大小适配策略
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
if (minimum <= 0) {
throw new IllegalArgumentException("minimum: " + minimum);
}
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
可以分析出
sizeTable
的值为
- n∗(24),n∈[16,512)
- (28)n,n∈[512,+∞), +∞表示直到溢出.
getSizeTableIndex
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
此方法,其实通过
size
比对,找到一个接近并大于size
的在SIZE_TABLE
的值得坐标并返回。有参构造调用,就是通过传入的限定字节大小,来确定
SIZE_TABLE
的index
范围。
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
默认情况下,对最小,最大,初始化的缓冲区大小已经设置了默认值。
可以看到,类是
public
的,变量是static
,我们可以通过设置静态变量来调整接收缓冲的默认大小。AdaptiveRecvByteBufAllocator.DEFAULT_MINIMUM = 64; AdaptiveRecvByteBufAllocator.DEFAULT_INITIAL = 1024; AdaptiveRecvByteBufAllocator.DEFAULT_MAXIMUM = 65535;
SIZE_TABLE
size
锁定范围,返回目标坐标public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
public int guess() {
return nextReceiveBufferSize;
}
通过guess
就可以获取该分配的大小了。
allocate
private final class HandleImpl extends MaxMessageHandle
这个信息得追到
MaxMessageHandle
才能看到。public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }
在某个还不知道的时候,就是调用它创建缓冲的。
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
缓冲大小策略明白了,它怎么创建的呢。
ioBuffre
PreferHeapByteBufAllocator
public ByteBuf ioBuffer(int initialCapacity) { return allocator.heapBuffer(initialCapacity); }
这个
allocator
直接就是heapBuffer
,就不管了。
AbstractByteBufAllocator
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
此时,就会有两种buffer
的选择创建,基于效率考量,我们肯定优先零拷贝的directBuffer
。
PlatformDependent.hasUnsafe()
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
HAS_UNSAFE
private static final boolean HAS_UNSAFE = hasUnsafe0();
hasUnsafe0
private static boolean hasUnsafe0() {
if (isAndroid()) {
logger.debug("sun.misc.Unsafe: unavailable (Android)");
return false;
}
if (PlatformDependent0.isExplicitNoUnsafe()) {
return false;
}
try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe;
} catch (Throwable ignored) {
return false;
}
}
追踪过程简单,我就直接到具体步骤了
android
private static boolean isAndroid0() { boolean android; try { Class.forName("android.app.Application", false, getSystemClassLoader()); android = true; } catch (Throwable ignored) { android = false; } return android; }
通过类名
android.app.Application
判断系统
explicitNoUnsafe
private static boolean explicitNoUnsafe0() { final boolean noUnsafe = SystemPropertyUtil.getBoolean("io.netty.noUnsafe", false); if (noUnsafe) { logger.debug("sun.misc.Unsafe: unavailable (io.netty.noUnsafe)"); return true; } boolean tryUnsafe; if (SystemPropertyUtil.contains("io.netty.tryUnsafe")) { tryUnsafe = SystemPropertyUtil.getBoolean("io.netty.tryUnsafe", true); } else { tryUnsafe = SystemPropertyUtil.getBoolean("org.jboss.netty.tryUnsafe", true); } if (!tryUnsafe) { logger.debug("sun.misc.Unsafe: unavailable (io.netty.tryUnsafe/org.jboss.netty.tryUnsafe)"); return true; } return false; }
- 优先查看系统静态变量
io.netty.noUnsafe
,默认false
- 不行查看系统静态变量
io.netty.tryUnsafe
,默认true
- 不包含
io.netty.tryUnsafe
则查看io.netty.tryUnsafe
,默认true
。- 默认
false
我们要绕过这一层屏蔽使用
directBuffer
,需要这样System.setProperty("io.netty.noUnsafe","false"); System.setProperty("io.netty.tryUnsafe","true");
hasUnsafe
static boolean hasUnsafe() { return UNSAFE != null; }
UNSAFE
这个字段呢,静态代码块里面有判断逻辑。总的来说,就是检查控制和支不支持。
有兴趣可以仔细分析一下,我能力有限,嘿嘿。
具体创建呢,就不用追了,类型大概也就是这两种。
Channel
过程,封装原生ServerSocketChannel
,以及扩展id
和pipeline
。更多封装信息?更多配置信息?我还不行,慢慢加深呗。