热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于时间轮的定时器HashedWheelTimer

基于时间轮的定时器HashedWheelTimer一、前言二、时间轮的结构三、HashedWheelTimer的相关组件四、HashedWheelTimer的工作流程五、源码解读5

基于时间轮的定时器HashedWheelTimer

    • 一、前言
    • 二、时间轮的结构
    • 三、HashedWheelTimer的相关组件
    • 四、HashedWheelTimer的工作流程
    • 五、源码解读
      • 5.1 HahedWheelTimer的关键属性
      • 5.2 提交延时任务给HahedWheelTimer
      • 5.3 工作线程Worker运行的具体步骤
    • 六、总结
    • 参考




一、前言

最近在阅读Redisson的源码时看到了内部使用了netty提供的这个组件,就想着看下这个定时器具体是如何实现的?

先介绍一下HashedWheelTimer,它是基于时间轮实现的一个定时器,它的优点是实现相对简单,缺点是无法精确、准时地执行定时任务,只能是近似执行

因为时间轮中每个刻度大小可能是100ms也可能1ms,所以在执行任务时,时间上会存在一点误差,在大部分网络应用中,IO任务的执行时间往往不需要那么精确,因此默认每个刻度小大是100ms,但你可以自己来调整刻度大小,最小是1ms。

简单介绍完HahsedWheelTimer,接下来我们先来看下时间轮的结构

二、时间轮的结构

时间轮类似于一个时钟,它和时钟一样是有刻度的,每个刻度大小可以是100ms也可以是1ms,如下图

在这里插入图片描述

上图的时间轮有6个刻度,每个刻度大小是100ms,也就是每过100ms会顺时针移动一个刻度,走完一圈需要600ms(下面要介绍的HashedWheelTimer默认刻度数是512,每个刻度大小默认是100ms)

工作原理:每往时间轮提交一个延时任务,会判断该任务的执行时间要放在哪个刻度上,比如在时间轮启动后的第100ms,提交了一个延时400ms执行的任务,那么该任务应该放在刻度5上,如果提交了一个延迟700ms执行的任务,那么该任务会放在刻度2上,并且会记录该任务还需要走一圈时间轮才能执行。时间轮每移动一个刻度,就会执行当前刻度上的任务,一个刻度上的任务可能会有多个

因为HashedWheelTimer是基于时间轮的定时器,所以接下来看一下HashedWheelTimer是如何实现的?

三、HashedWheelTimer的相关组件

这里我们可以先看下HashedWheelTimer的UML图,能够对相关组件先有个整体的认识,如下
在这里插入图片描述

  • Timer: 定时器接口,提供提交延时任务newTimeout、停止定时器等方法

  • HashedWheelTimer: 实现Timer接口,内部包含工作线程Worker、时间轮wheel、延时任务队列timeouts、线程池taskExecutor等

  • HashedWheelBucket:上面的时间轮wheel是一个HashedWheelBucket数组,每一个刻度对应一个HashedWheelBucket,而每一个HashedWheelBucket内部是一个HashedWheelTimeout的双向链表,如下图
    在这里插入图片描述

  • TimerTask: 延时任务接口,内部只提供一个run方法用于执行

  • Timeout: 对Timer、TimerTask的封装

  • HashedWheelTimeout: 包含了任务的执行时间dealline、所需要的圈数remainingRounds、双向链表中上一个以及下一个HashedWheelTimeout、所在的HashedWheelBucket等


四、HashedWheelTimer的工作流程

大致工作流程如下图:
在这里插入图片描述

从上图可以看到,主要分为4步骤,但是准确来说应该是有5步:

  1. 提交延时任务给HashedWheelTimer,延时任务会先放到任务队列timeouts中
  2. 工作线程Worker会从任务队列timeouts中获取任务
  3. 将获取到的HashedWheelTimeout任务放到指定的HashedWheelBucket中
  4. 取出当前刻度对应的HashedWheelBucket的所有HashedWheelTimeout来执行
  5. 将刻度tick加1,再回到第二步,如此循环

五、源码解读


5.1 HahedWheelTimer的关键属性

关键属性如下:

  • Worker worker:工作线程Worker
  • int workerState:工作线程状态
  • long tickDuration:刻度大小,默认是100ms
  • HashedWheelBucket[] wheel:时间轮的每个刻度会对应一个HashedWheelBucket
  • Queue timeouts:任务队列
  • Queue cancelledTimeouts:已取消的任务队列
  • AtomicLong pendingTimeouts:正在处理的任务数
  • Executor taskExecutor:线程池,用于执行任务
  • long startTime:定时器的启动时间

5.2 提交延时任务给HahedWheelTimer

通过newTimeout方法来提交延时任务,newTimeout方法步骤如下:

  1. 启动工作线程Worker,如果是首次启动,设置启动时间startTime,如果已启动,则跳过
  2. 计算延时任务的deadline(当前时间 + 延迟时间 - 启动时间startTime),用于判断后续放到时间轮的哪个HashedWheelBucket中
  3. 将延时任务封装为HashedWheelTimeout,并添加到任务队列timeouts

结合源码来看:

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// 部分代码省略// 启动工作线程Workerstart();// 计算deadlinelong deadline &#61; System.nanoTime() &#43; unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline &#61; Long.MAX_VALUE;}// 封装为HashedWheelTimeout&#xff0c;并添加到任务队列timeouts中HashedWheelTimeout timeout &#61; new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;
}

5.3 工作线程Worker运行的具体步骤

Worker类中有一个关键的属性tick&#xff0c;代表相对于定时器的启动时间startTime&#xff0c;当前已经走到了哪个刻度&#xff0c;tick只会一直往上递增&#xff0c;初始值为0

具体步骤如下&#xff1a;

  1. 等到下一个刻度来临&#xff0c;即当前时间 > 当前刻度tick的结束时间
    a) 计算当前刻度tick的结束时间&#xff0c;比如Worker刚启动&#xff0c;当前刻度tick为0&#xff0c;那么刻度tick的结束时间 &#61; tickDuration * (tick &#43; 1)&#xff0c;即100ms
    b) 判断当前时间&#xff08;相对于启动时间startTime&#xff09;是否大于当前刻度的结束时间&#xff0c;如果大于&#xff0c;说明当前时间已经过了当前刻度的结束时间&#xff0c;开始准备处理当前刻度的所有任务。如果小于&#xff0c;说明当前时间还没到当前刻度的结束时间&#xff0c;主动sleep一段时间后继续判断&#xff0c;直到当前时间大于当前刻度的结束时间。
  2. 从任务队列timeouts中获取任务&#xff0c;将延时任务的deadline除以tickDuration&#xff0c;计算出该任务的总刻度数以及还需要的圈数&#xff0c;通过 **总刻度数 & &#xff08;wheel.length -1 &#xff09;**来算出放在哪个HashedWheelBucket中&#xff08;比如算出A任务的总刻度数 &#61; 1026&#xff0c;当前刻度 &#61; 25&#xff0c;时间轮的刻度有512个&#xff0c;那么算出还需要的圈数是1【如果当前刻度 &#61; 1&#xff0c;那么还需要的圈数会是2】&#xff0c;放在下标为2的HashedWheelBucket中&#xff09;
  3. 获取当前刻度对应的HashedWheelBucket&#xff0c;从head开始逐一遍历任务链表&#xff0c;如果延时任务的所需圈数为0&#xff0c;开始执行任务&#xff0c;否则所需圈数减1。
  4. 刻度tick加1&#xff0c;回到第一步&#xff0c;如此循环

结合源码来看

public void run() {// 初始化定时器的启动时间startTimestartTime &#61; System.nanoTime();startTimeInitialized.countDown();do {// 1、等到下一个刻度来临&#xff0c;即当前时间 > 当前刻度tick的结束时间final long deadline &#61; waitForNextTick();if (deadline > 0) {// 获取当前刻度tick对应的HashedWheelBucketint idx &#61; (int) (tick & mask);processCancelledTasks();HashedWheelBucket bucket &#61; wheel[idx];// 2、从任务队列timeouts中获取任务&#xff0c;并将任务放入到对应的HashedWheelBucket中 transferTimeoutsToBuckets();// 3、执行当前刻度tick对应的HashedWheelBucket中的所有任务bucket.expireTimeouts(deadline);// 4、将当前刻度tick加1tick&#43;&#43;;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) &#61;&#61; WORKER_STATE_STARTED);// 部分代码省略
}

这里我们先来看下第一步waitForNextTick方法的具体实现

private long waitForNextTick() {long deadline &#61; tickDuration * (tick &#43; 1);for (;;) {// 相对于startTime的当前时间final long currentTime &#61; System.nanoTime() - startTime;long sleepTimeMs &#61; (deadline - currentTime &#43; 999999) / 1000000;// 如果当前刻度tick的结束时间 <当前时间&#xff0c;说明当前时间已经过了当前刻度的结束时间&#xff0c;直接返回当前时间if (sleepTimeMs <&#61; 0) {if (currentTime &#61;&#61; Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// 否则主动sleep一段时间&#xff0c;上面的条件成立try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) &#61;&#61; WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}
}

接着看下第二步的transferTimeoutsToBuckets方法&#xff0c;如下

private void transferTimeoutsToBuckets() {// 这里一次最多从队列里获取100000个任务for (int i &#61; 0; i < 100000; i&#43;&#43;) {HashedWheelTimeout timeout &#61; timeouts.poll();if (timeout &#61;&#61; null) {// 代表队列里已经没有任务&#xff0c;直接返回break;}if (timeout.state() &#61;&#61; HashedWheelTimeout.ST_CANCELLED) {continue;}// 计算总刻度数 &#61; 延时任务的deadline / 刻度大小long calculated &#61; timeout.deadline / tickDuration;// 计算还需要的圈数 &#61; 总刻度数 - 当前刻度 / 时间轮的刻度数timeout.remainingRounds &#61; (calculated - tick) / wheel.length;final long ticks &#61; Math.max(calculated, tick);// 计算放在哪个下标int stopIndex &#61; (int) (ticks & mask);HashedWheelBucket bucket &#61; wheel[stopIndex];// 将该任务放入到对应的HashedWheelBucket中bucket.addTimeout(timeout);}
}

最后看下第三步bucket.expireTimeouts&#xff0c;源码如下&#xff1a;

public void expireTimeouts(long deadline) {HashedWheelTimeout timeout &#61; head;// 处理该HashedWheelBucket的所有任务while (timeout !&#61; null) {HashedWheelTimeout next &#61; timeout.next;if (timeout.remainingRounds <&#61; 0) {// 将任务从双向链表中移除next &#61; remove(timeout);if (timeout.deadline <&#61; deadline) {// 执行任务timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next &#61; remove(timeout);} else {// 如果所需圈数 > 0&#xff0c;则将其减1timeout.remainingRounds --;}timeout &#61; next;}
}

至此&#xff0c;工作线程Worker运行的具体步骤以及部分源码的解读就完成了

六、总结

HashedWheelTimer只是定时器的一种简单实现&#xff0c;像java中常见的定时器还有Timer、ScheduledThreadPoolExecutor等&#xff0c;从上面分析它的实现原理可知&#xff0c;它无法应用于需要精确执行的场景&#xff0c;但是在网络应用中&#xff0c;IO任务的执行时间往往不需要精确&#xff0c;所以它可以在任务较多、但任务不需要精确执行的场景下进行使用。



参考

【Netty】八、基于时间轮的定时器HashedWheelTimer


推荐阅读
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • Python多线程编程技巧与实战应用详解 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 详解 Qt 串口通信程序全程图文 (4)
    Qt串口通信程序全程图文是本文介绍的内容,本文一开始先讲解对程序的改进,在文章最后将要讲解一些重要问题。1、在窗口中加入一些组合框ComboBox&# ... [详细]
  • 本报告对2018年湘潭大学程序设计竞赛在牛客网上的时间数据进行了详细分析。通过统计参赛者在各个时间段的活跃情况,揭示了比赛期间的编程频率和时间分布特点。此外,报告还探讨了选手在准备过程中面临的挑战,如保持编程手感、学习逆向工程和PWN技术,以及熟悉Linux环境等。这些发现为未来的竞赛组织和培训提供了 valuable 的参考。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 题目解析给定 n 个人和 n 种书籍,每个人都有一个包含自己喜好的书籍列表。目标是计算出满足以下条件的分配方案数量:1. 每个人都必须获得他们喜欢的书籍;2. 每本书只能分配给一个人。通过使用深度优先搜索算法,可以系统地探索所有可能的分配组合,确保每个分配方案都符合上述条件。该方法能够有效地处理这类组合优化问题,找到所有可行的解。 ... [详细]
author-avatar
抬头看着海
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有