热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty源码分析之核心线程处理

来源:https://github.com/netty/netty核心线程是NioEventLoop,在第一次往任务队列中添加任务时开始启动线程abstract

来源:https://github.com/netty/netty


核心线程是NioEventLoop,在第一次往任务队列中添加任务时开始启动线程

    abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
    thread = threadFactory.newThread(new Runnable() {
    @Override
    public void run() {
    boolean success = false;
    updateLastExecutionTime();
            SingleThreadEventExecutor.this.run();
    success = true;
    }
    });


    任务添加完成后会设置唤醒状态为true,这个时候是主线程,所以inEventLoop为false,唤醒选择器的等待。

      if (!addTaskWakesUp && wakesUpForTask(task)) {
      wakeup(inEventLoop);
      }


      protected boolean wakesUpForTask(Runnable task) {
      return !(task instanceof NonWakeupRunnable);
      }


      protected void wakeup(boolean inEventLoop) {
      if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
      selector.wakeup();
      }
      }


      具体的run方法实现为

        protected void run() {
        for (;;) {
        try {
        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
        continue;
        case SelectStrategy.SELECT:
        select(wakenUp.getAndSet(false));


        // 'wakenUp.compareAndSet(false, true)' is always evaluated
        // before calling 'selector.wakeup()' to reduce the wake-up
        // overhead. (Selector.wakeup() is an expensive operation.)
        //
        // However, there is a race condition in this approach.
        // The race condition is triggered when 'wakenUp' is set to
        // true too early.
        //
        // 'wakenUp' is set to true too early if:
        // 1) Selector is waken up between 'wakenUp.set(false)' and
        // 'selector.select(...)'. (BAD)
        // 2) Selector is waken up between 'selector.select(...)' and
        // 'if (wakenUp.get()) { ... }'. (OK)
        //
        // In the first case, 'wakenUp' is set to true and the
        // following 'selector.select(...)' will wake up immediately.
        // Until 'wakenUp' is set to false again in the next round,
        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
        // any attempt to wake up the Selector will fail, too, causing
        // the following 'selector.select(...)' call to block
        // unnecessarily.
        //
        // To fix this problem, we wake up the selector again if wakenUp
        // is true immediately after selector.select(...).
        // It is inefficient in that it wakes up the selector for both
        // the first case (BAD - wake-up required) and the second case
        // (OK - no wake-up required).


        if (wakenUp.get()) {
        selector.wakeup();
        }
        // fall through
        default:
                  }
        } catch (Throwable t) {
        handleLoopException(t);
              }
        }
        }


        判断任务队列中是否由需要执行的任务

          protected boolean hasTasks() {
          assert inEventLoop();
          return !taskQueue.isEmpty();
          }


          计算对应的返回值

            public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
            }


            如果有任务的话先检查是否有需要处理的网络请求,有的话返回大于0的数字

              private final IntSupplier selectNowSupplier = new IntSupplier() {
              @Override
              public int get() throws Exception {
              return selectNow();
              }
              };


              int selectNow() throws IOException {
              try {
              return selector.selectNow();
              } finally {
              // restore wakeup state if needed
              if (wakenUp.get()) {
              selector.wakeup();
              }
              }
              }


              判断选择的返回值数值,当为SelectStrategy.CONTINUE时需要重新检查一次,当没有任务时会返回SelectStrategy.SELECT,获取选择器检查请求的超时时间,根据延迟任务来确认时间的具体数值,再次判断是否有任务和唤醒的状态,阻塞超时等待,继续判断返回的key值是否有事件发生,判断之前是否已经有唤醒,判断当前的唤醒状态,判断是否有任务或者定时任务

                private void select(boolean oldWakenUp) throws IOException {
                Selector selector = this.selector;
                try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) 1000000L;
                if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
                }
                break;
                }


                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
                }


                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;


                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
                }
                if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                "Thread.currentThread().interrupt() was called. Use " +
                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
                }


                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                selectCnt, selector);


                rebuildSelector();
                selector = this.selector;


                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
                }


                currentTimeNanos = time;
                }


                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                selectCnt - 1, selector);
                }
                }
                } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                selector, e);
                }
                // Harmless exception - log anyway
                }
                }


                判断nio是否发生了空循环,也就是选择器多次没有等待足够的时间就直接返回结果,默认次数为512,然后需要重建选择器

                  public void rebuildSelector() {
                  if (!inEventLoop()) {
                  execute(new Runnable() {
                  @Override
                  public void run() {
                  rebuildSelector0();
                  }
                  });
                  return;
                  }
                  rebuildSelector0();
                  }


                  把旧的选择器的key,关注点以及附件等信息重新注册到新的选择器上,最后关闭旧的选择器。

                    private void rebuildSelector0() {
                    final Selector oldSelector = selector;
                    final SelectorTuple newSelectorTuple;


                    if (oldSelector == null) {
                    return;
                    }


                    try {
                    newSelectorTuple = openSelector();
                    } catch (Exception e) {
                    logger.warn("Failed to create a new Selector.", e);
                    return;
                    }


                    // Register all channels to the new Selector.
                    int nChannels = 0;
                    for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                    if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                    }


                    int interestOps = key.interestOps();
                    key.cancel();
                    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectiOnKey= newKey;
                    }
                    nChannels ++;
                    } catch (Exception e) {
                    logger.warn("Failed to re-register a Channel to the new Selector.", e);
                    if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                    } else {
                    @SuppressWarnings("unchecked")
                    NioTask task = (NioTask) a;
                    invokeChannelUnregistered(task, key, e);
                    }
                    }
                    }


                    selector = newSelectorTuple.selector;
                    unwrappedSelector = newSelectorTuple.unwrappedSelector;


                    try {
                    // time to close the old selector as everything else is registered to the new one
                    oldSelector.close();
                    } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                    }
                    }


                    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
                    }


                    选择器检查等待结束后开始执行任务和处理请求事件,ioRatio用来控制网络io操作占用的时间比例,先处理网络请求

                      cancelledKeys = 0;
                      needsToSelectAgain = false;
                      final int ioRatio = this.ioRatio;
                      if (ioRatio == 100) {
                      try {
                      processSelectedKeys();
                      } finally {
                      // Ensure we always run tasks.
                      runAllTasks();
                      }
                      } else {
                      final long ioStartTime = System.nanoTime();
                      try {
                      processSelectedKeys();
                      } finally {
                      // Ensure we always run tasks.
                      final long ioTime = System.nanoTime() - ioStartTime;
                      runAllTasks(ioTime * (100 - ioRatio) ioRatio);
                      }
                      }


                      处理完后分配对应的时间来处理任务

                        protected boolean runAllTasks() {
                        boolean fetchedAll;
                        do {
                        fetchedAll = fetchFromScheduledTaskQueue();
                        Runnable task = pollTask();
                        if (task == null) {
                        return false;
                        }


                        for (;;) {
                        try {
                        task.run();
                        } catch (Throwable t) {
                        logger.warn("A task raised an exception.", t);
                        }


                        task = pollTask();
                        if (task == null) {
                        break;
                        }
                        }
                        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.


                        lastExecutiOnTime= ScheduledFutureTask.nanoTime();
                        return true;
                        }


                        取出第一个任务开始执行,判断是否需要立刻再次执行选择器检查网络请求,这里的任务就包含之前的处理器上下文添加操作,注册网络通道,激活通道活跃事件等

                          protected Runnable pollTask() {
                          Runnable task = super.pollTask();
                          if (needsToSelectAgain) {
                          selectAgain();
                          }
                          return task;
                          }


                          protected Runnable pollTask() {
                          assert inEventLoop();
                          for (;;) {
                          Runnable task = taskQueue.poll();
                          if (task == WAKEUP_TASK) {
                          continue;
                          }
                          return task;
                          }
                          }




                          推荐阅读
                          author-avatar
                          你走以后_心若逝世灰决_677
                          这个家伙很懒,什么也没留下!
                          PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                          Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有