作者:手哥2502852243 | 来源:互联网 | 2023-09-17 13:57
0.NioEventLoopGroup简介NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个
0. NioEventLoopGroup简介
NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。
1. NioEventLoopGroup类图
2. 构造方法
new NioEventLoopGroup()方法会调用到MultithreadEventLoopGroup的构造方法:
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));//默认值为系统core数的两倍
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//如果采用无参的构造函数,传入的nThreads变量为0,此时线程数会被设置为系统core数*2
}
然后会调用MultithreadEventExecutorGroup的构造方法:
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];//设定线程池大小
for (int i = 0; i ) {
boolean success = false;
try {
children[i] = newChild(executor, args);//新建nThreads个子线程
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {//如果新建子线程的过程中出错,则关闭所有子线程
for (int j = 0; j ) {
children[j].shutdownGracefully();
}
for (int j = 0; j ) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);//设定新任务的分配策略
final FutureListener terminatiOnListener= new FutureListener() {//注册一些回调函数用于清理工作
@Override
public void operationComplete(Future future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set childrenSet = new LinkedHashSet(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
其中比较重要的地方有两处:调用子类实现的newChild方法设置子线程,以及设置新任务的分配策略
先看一下NioEventLoopGroup中实现的newChild方法:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
很简单的新建一个NioEventLoop对象并返回,我们下一节会介绍NioEventLoop
而chooserFactory.newChooser最终会跳转到DefaultEventExecutorChooserFactory里:
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
虽然分配策略都是round-robin,但是在子线程的数量为2的幂时,可以用位运算来加速,效率很高。
Netty为了追求效率确实不择手段。