基于时间轮的定时器HashedWheelTimer
- 一、前言
- 二、时间轮的结构
- 三、HashedWheelTimer的相关组件
- 四、HashedWheelTimer的工作流程
- 五、源码解读
- 5.1 HahedWheelTimer的关键属性
- 5.2 提交延时任务给HahedWheelTimer
- 5.3 工作线程Worker运行的具体步骤
- 六、总结
- 参考
一、前言
最近在阅读Redisson的源码时看到了内部使用了netty提供的这个组件,就想着看下这个定时器具体是如何实现的?
先介绍一下HashedWheelTimer,它是基于时间轮实现
的一个定时器,它的优点是实现相对简单,缺点是无法精确、准时地执行定时任务,只能是近似执行
。
因为时间轮中每个刻度大小可能是100ms也可能1ms,所以在执行任务时,时间上会存在一点误差,在大部分网络应用中,IO任务的执行时间往往不需要那么精确,因此默认每个刻度小大是100ms,但你可以自己来调整刻度大小,最小是1ms。
简单介绍完HahsedWheelTimer,接下来我们先来看下时间轮的结构
二、时间轮的结构
时间轮类似于一个时钟,它和时钟一样是有刻度的,每个刻度大小可以是100ms也可以是1ms,如下图
上图的时间轮有6个刻度,每个刻度大小是100ms,也就是每过100ms会顺时针移动一个刻度,走完一圈需要600ms(下面要介绍的HashedWheelTimer默认刻度数是512,每个刻度大小默认是100ms)
工作原理:每往时间轮提交一个延时任务,会判断该任务的执行时间要放在哪个刻度上
,比如在时间轮启动后的第100ms,提交了一个延时400ms执行的任务,那么该任务应该放在刻度5上,如果提交了一个延迟700ms执行的任务,那么该任务会放在刻度2上,并且会记录该任务还需要走一圈时间轮才能执行。时间轮每移动一个刻度,就会执行当前刻度上的任务,一个刻度上的任务可能会有多个。
因为HashedWheelTimer是基于时间轮的定时器,所以接下来看一下HashedWheelTimer是如何实现的?
三、HashedWheelTimer的相关组件
这里我们可以先看下HashedWheelTimer的UML图,能够对相关组件先有个整体的认识,如下
-
Timer: 定时器接口,提供提交延时任务newTimeout、停止定时器
等方法
-
HashedWheelTimer: 实现Timer接口,内部包含工作线程Worker、时间轮wheel、延时任务队列timeouts、线程池taskExecutor等
-
HashedWheelBucket:上面的时间轮wheel是一个HashedWheelBucket数组,每一个刻度对应一个HashedWheelBucket,而每一个HashedWheelBucket内部是一个HashedWheelTimeout的双向链表
,如下图
-
TimerTask: 延时任务接口,内部只提供一个run方法用于执行
-
Timeout: 对Timer、TimerTask的封装
-
HashedWheelTimeout: 包含了任务的执行时间dealline、所需要的圈数remainingRounds、双向链表中上一个以及下一个HashedWheelTimeout、所在的HashedWheelBucket等
四、HashedWheelTimer的工作流程
大致工作流程如下图:
从上图可以看到,主要分为4步骤,但是准确来说应该是有5步:
- 提交延时任务给HashedWheelTimer,延时任务会先放到任务队列timeouts中
- 工作线程Worker会从任务队列timeouts中获取任务
- 将获取到的HashedWheelTimeout任务放到指定的HashedWheelBucket中
- 取出当前刻度对应的HashedWheelBucket的所有HashedWheelTimeout来执行
- 将刻度tick加1,再回到第二步,如此循环
五、源码解读
5.1 HahedWheelTimer的关键属性
关键属性如下:
- Worker worker:工作线程Worker
- int workerState:工作线程状态
- long tickDuration:刻度大小,
默认是100ms
- HashedWheelBucket[] wheel:
时间轮的每个刻度会对应一个HashedWheelBucket
- Queue timeouts:
任务队列
- Queue cancelledTimeouts:已取消的任务队列
- AtomicLong pendingTimeouts:正在处理的任务数
- Executor taskExecutor:线程池,用于执行任务
- long startTime:
定时器的启动时间
5.2 提交延时任务给HahedWheelTimer
通过newTimeout方法来提交延时任务,newTimeout方法步骤如下:
- 启动工作线程Worker,如果是首次启动,设置启动时间startTime,如果已启动,则跳过
- 计算延时任务的deadline(
当前时间 + 延迟时间 - 启动时间startTime
),用于判断后续放到时间轮的哪个HashedWheelBucket中 - 将延时任务封装为HashedWheelTimeout,并
添加到任务队列timeouts
中
结合源码来看:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {start();long deadline &#61; System.nanoTime() &#43; unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline &#61; Long.MAX_VALUE;}HashedWheelTimeout timeout &#61; new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;
}
5.3 工作线程Worker运行的具体步骤
Worker类中有一个关键的属性tick
&#xff0c;代表相对于定时器的启动时间startTime&#xff0c;当前已经走到了哪个刻度
&#xff0c;tick只会一直往上递增&#xff0c;初始值为0
具体步骤如下&#xff1a;
- 等到下一个刻度来临&#xff0c;即当前时间 > 当前刻度tick的结束时间
a) 计算当前刻度tick的结束时间
&#xff0c;比如Worker刚启动&#xff0c;当前刻度tick为0&#xff0c;那么刻度tick的结束时间 &#61; tickDuration * (tick &#43; 1)&#xff0c;即100ms
b) 判断当前时间&#xff08;相对于启动时间startTime
&#xff09;是否大于当前刻度的结束时间&#xff0c;如果大于&#xff0c;说明当前时间已经过了当前刻度的结束时间&#xff0c;开始准备处理当前刻度的所有任务。如果小于&#xff0c;说明当前时间还没到当前刻度的结束时间&#xff0c;主动sleep一段时间后继续判断&#xff0c;直到当前时间大于当前刻度的结束时间。 - 从任务队列timeouts中获取任务&#xff0c;将延时任务的deadline除以tickDuration&#xff0c;计算出该任务的
总刻度数以及还需要的圈数
&#xff0c;通过 **总刻度数 & &#xff08;wheel.length -1 &#xff09;**来算出放在哪个HashedWheelBucket中&#xff08;比如算出A任务的总刻度数 &#61; 1026&#xff0c;当前刻度 &#61; 25&#xff0c;时间轮的刻度有512个&#xff0c;那么算出还需要的圈数是1【如果当前刻度 &#61; 1&#xff0c;那么还需要的圈数会是2】&#xff0c;放在下标为2的HashedWheelBucket中
&#xff09; - 获取当前刻度对应的HashedWheelBucket&#xff0c;从head开始逐一遍历任务链表&#xff0c;如果延时任务的所需圈数为0&#xff0c;开始执行任务&#xff0c;否则所需圈数减1。
刻度tick加1
&#xff0c;回到第一步&#xff0c;如此循环
结合源码来看
public void run() {startTime &#61; System.nanoTime();startTimeInitialized.countDown();do {final long deadline &#61; waitForNextTick();if (deadline > 0) {int idx &#61; (int) (tick & mask);processCancelledTasks();HashedWheelBucket bucket &#61; wheel[idx];transferTimeoutsToBuckets();bucket.expireTimeouts(deadline);tick&#43;&#43;;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) &#61;&#61; WORKER_STATE_STARTED);
}
这里我们先来看下第一步waitForNextTick方法的具体实现
private long waitForNextTick() {long deadline &#61; tickDuration * (tick &#43; 1);for (;;) {final long currentTime &#61; System.nanoTime() - startTime;long sleepTimeMs &#61; (deadline - currentTime &#43; 999999) / 1000000;if (sleepTimeMs <&#61; 0) {if (currentTime &#61;&#61; Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) &#61;&#61; WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}
}
接着看下第二步的transferTimeoutsToBuckets方法&#xff0c;如下
private void transferTimeoutsToBuckets() {for (int i &#61; 0; i < 100000; i&#43;&#43;) {HashedWheelTimeout timeout &#61; timeouts.poll();if (timeout &#61;&#61; null) {break;}if (timeout.state() &#61;&#61; HashedWheelTimeout.ST_CANCELLED) {continue;}long calculated &#61; timeout.deadline / tickDuration;timeout.remainingRounds &#61; (calculated - tick) / wheel.length;final long ticks &#61; Math.max(calculated, tick);int stopIndex &#61; (int) (ticks & mask);HashedWheelBucket bucket &#61; wheel[stopIndex];bucket.addTimeout(timeout);}
}
最后看下第三步bucket.expireTimeouts&#xff0c;源码如下&#xff1a;
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout &#61; head;while (timeout !&#61; null) {HashedWheelTimeout next &#61; timeout.next;if (timeout.remainingRounds <&#61; 0) {next &#61; remove(timeout);if (timeout.deadline <&#61; deadline) {timeout.expire();} else {throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next &#61; remove(timeout);} else {timeout.remainingRounds --;}timeout &#61; next;}
}
至此&#xff0c;工作线程Worker运行的具体步骤以及部分源码的解读就完成了
六、总结
HashedWheelTimer只是定时器的一种简单实现&#xff0c;像java中常见的定时器还有Timer、ScheduledThreadPoolExecutor等&#xff0c;从上面分析它的实现原理可知&#xff0c;它无法应用于需要精确执行的场景&#xff0c;但是在网络应用中&#xff0c;IO任务的执行时间往往不需要精确&#xff0c;所以它可以在任务较多、但任务不需要精确执行
的场景下进行使用。
参考
【Netty】八、基于时间轮的定时器HashedWheelTimer