热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入解析ScheduledThreadPoolExecutor并发执行机制源代码

本文深入剖析了ScheduledThreadPoolExecutor的并发执行机制及其源代码,详细解读了该线程池如何在指定延时或定期执行任务,探讨了其内部的工作原理和优化策略,为开发者提供了宝贵的参考和实践指导。

ScheduledThreadPoolExecutor是一个可以在指定一定延时时间后或者定时进行任务调度的线程池,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。线程池的队列是DelayedWorkQueue(他是ScheduledThreadPoolExecutor的一个内部类)。
在这里插入图片描述

还要在看一下ScheduledFutureTask(同样是ScheduledThreadPoolExecutor的一个内部类),继承FutureTask,FutureTask的内部有一个变量state用来表示任务的状态,一开始状态为NEW。下面是所有状态定义。

private volatile int state;private static final int NEW = 0;//初始化状态private static final int COMPLETING = 1;//执行中状态private static final int NORMAL = 2;//正常运行结束状态private static final int EXCEPTIONAL = 3;//运行中异常private static final int CANCELLED = 4;//任务被取消private static final int INTERRUPTING = 5;//任务正在被中段private static final int INTERRUPTED = 6;//任务已经被中断

ScheduledFutureTask内部还有一个变量period用来表示任务的类型,如果==0,则表示任务是一次性的,任务执行完毕后退出,如果为负数,说明当前任务是以固定延时的定时可重复执行任务,如果为正数,说明任务是以固定频率的定时定时可重复执行任务。

一、chedule(Runnable command, long delay, TimeUnit unit)方法解析

他的作用是提交一个延时执行的任务,任务从提交时间算起延时单位为unit的delay时间后开始执行,任务只会执行一次。

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//判断参数是否为空&#xff0c;空则跑出异常if (command &#61;&#61; null || unit &#61;&#61; null)throw new NullPointerException();//任务转换RunnableScheduledFuture<?> t &#61; decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//添加任务到延时队列delayedExecute(t);return t;}

首先是参数判断&#xff0c;为空则抛出异常&#xff0c;接着是装饰任务&#xff0c;把提交的command(Runnable对象)转换为ScheduledFutureTask&#xff0c;ScheduledFutureTask是具体放入延时队列里面的东西&#xff0c;由于是延时任务&#xff0c;所以ScheduledFutureTask实现了getDelay和compareTo方法&#xff0c;triggerTime方法将延时时间转换为绝对时间&#xff0c;也就是把当前时间的纳秒加上延迟的纳秒后的值。

ScheduledFutureTask的构造如下&#xff0c;设period为0&#xff0c;表示该任务是一次性任务。

ScheduledFutureTask(Callable<V> callable, long ns) {super(callable);this.time &#61; ns;this.period &#61; 0;this.sequenceNumber &#61; sequencer.getAndIncrement();}

然后通过delayedExecute将任务添加到延时队列。

private void delayedExecute(RunnableScheduledFuture<?> task) {//如果线程池管理&#xff0c;则执行线程拒绝策略if (isShutdown())reject(task);else {//添加任务到延时队列super.getQueue().add(task);//再次检查线程池状态if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else//确保至少一个线程在处理任务ensurePrestart();}}

上述代码首先确保线程池没关闭&#xff0c;关闭则执行拒绝策略&#xff0c;没关闭将任务添加到延时队列&#xff0c;添加后再重新检查线程池是否关闭&#xff0c;如果关闭则从延时队列里面删除刚才添加的任务。

再看ensurePrestart方法。

void ensurePrestart() {int wc &#61; workerCountOf(ctl.get());//增加核心线程数if (wc < corePoolSize)addWorker(null, true);else if (wc &#61;&#61; 0)addWorker(null, false);}

首先获取到了线程池中的线程数&#xff0c;如果个数小于核心线程池则新增一个线程&#xff0c;否则如果当前线程数为0个&#xff0c;则同样新增一个线程。

我们知道ThreadPoolExecutor在具体执行任务的线程是Worker线程&#xff0c;Worker线程调用具体任务的run方法来执行&#xff0c;在这里的任务是ScheduledFutureTask&#xff0c;所以来看看ScheduledFutureTask的run方法。

public void run() {//是否执行一次boolean periodic &#61; isPeriodic();//取消任务if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();//定时任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}
}

首先判断任务是一次性的还是可重复执行的任务&#xff0c;ScheduledFutureTask在构造方法中已经设置了period为0&#xff0c;所以&#xff0c;这里会返回false。

public boolean isPeriodic() {return period !&#61; 0;}

然后判断当前任务是否应该被取消。为true则取消任务。

由于periodic为false&#xff0c;则会执行代码ScheduledFutureTask.super.run();&#xff0c;调用父类FutureTask的run方法。

public void run() {if (state !&#61; NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c &#61; callable;if (c !&#61; null && state &#61;&#61; NEW) {V result;boolean ran;try {result &#61; c.call();ran &#61; true;} catch (Throwable ex) {result &#61; null;ran &#61; false;setException(ex);}if (ran)set(result);}} finally {runner &#61; null;int s &#61; state;if (s >&#61; INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

FutureTask.run()首先判断任务状态&#xff0c;如果不是NEW则直接返回&#xff0c;或者如果任务状态为NEW&#xff0c;但是使用CAS设置当前任务的持有者为当前线程失败则直接返回。

然后具体调用callable的call方法执行任务&#xff0c;如果任务执行成功则修改任务状态&#xff0c;也就是set方法。

protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome &#61; v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

使用CAS将当前任务的状态从NEW转换的COMPLETING。这里当有多个线程调用时只有一个线程会成功&#xff0c;成功的线程在通过 UNSAFE.putOrderedInt设置任务的状态为正常结束状态。

还有在任务执行失败后&#xff0c;执行setException方法&#xff0c;和set方法类似了。

protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome &#61; t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}

二、scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit)方法解析

他的作用是&#xff0c;当任务执行完毕后&#xff0c;让其延迟固定时间后再次运行&#xff0c;initialDelay表示提交任务后延迟多少时间开始执行任务command&#xff0c;delay表示当任务执行完毕后延长多少时间后再次运行command&#xff0c;unit是时间单位。

这个任务会一直重复运行下去&#xff0c;直到任务中抛出异常、被取消、线程池关闭。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {//参数判断if (command &#61;&#61; null || unit &#61;&#61; null)throw new NullPointerException();if (delay <&#61; 0)throw new IllegalArgumentException();//任务转换ScheduledFutureTask<Void> sft &#61;new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t &#61; decorateTask(command, sft);sft.outerTask &#61; t;//添加任务到队列delayedExecute(t);return t;}

首先也是参数判断&#xff0c;为空则抛出异常&#xff0c;然后将command任务转换为ScheduledFutureTask&#xff0c;然后添加延迟到队列。

将任务添加到队列后线程池线程会从队列中获取任务&#xff0c;然后调用ScheduledFutureTask的run方法&#xff0c;由于这里period<0&#xff0c;所以isPeriodic返回true&#xff0c;则会执行方法runAndReset()。

protected boolean runAndReset() {if (state !&#61; NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran &#61; false;int s &#61; state;try {Callable<V> c &#61; callable;if (c !&#61; null && s &#61;&#61; NEW) {try {c.call(); ran &#61; true;} catch (Throwable ex) {setException(ex);}}} finally {runner &#61; null;s &#61; state;if (s >&#61; INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s &#61;&#61; NEW;
}

他在任务执行完毕后不会设置任务的状态&#xff0c;是为了让任务可重复执行&#xff0c;看最后一句&#xff0c;判断如果当前任务正常执行完毕并且任务状态为NEW则返回true&#xff0c;如果返回true则执行方法setNextRunTime()&#xff0c;用于设置任务下一次的执行时间。

这里p是<0的&#xff0c;然后设置timer为当前时间加上-p&#xff0c;也就是延迟-p时间后再次执行。

private void setNextRunTime() {long p &#61; period;if (p > 0)time &#43;&#61; p;elsetime &#61; triggerTime(-p);}

三、scheduleAtFixedRate(Runnable command, long initialDelay, long period,TimeUnit unit)方法解析

该方法相对起始时间点以固定频率调用指定任务&#xff0c;当把任务提交到线程池并延迟initialDelay时间后开始执行任务command&#xff0c;然后从initialDelay&#43;period时间点再次执行&#xff0c;而后在initialDelay&#43;2*period时间点再次执行&#xff0c;直到抛出异常或者取消、关闭线程池。

原理和scheduleWithFixedDelay类似&#xff0c;我们就看几个不同点&#xff0c;

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command &#61;&#61; null || unit &#61;&#61; null)throw new NullPointerException();if (period <&#61; 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft &#61;new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t &#61; decorateTask(command, sft);sft.outerTask &#61; t;delayedExecute(t);return t;}

首先是period&#61;period&#xff0c;不再是-period。所以当前任务执行完毕后调用setNextRunTime设置任务下次执行的时间是 time &#43;&#61; p。

private void setNextRunTime() {long p &#61; period;if (p > 0)time &#43;&#61; p;elsetime &#61; triggerTime(-p);}

如果当前任务还没有执行完&#xff0c;下一次执行任务的时间到了&#xff0c;则不会并发执行&#xff0c;下次要执行的任务会延迟&#xff0c;要等到当前任务执行完毕后再次执行


推荐阅读
  • 目录 ScheduledThreadPoolExecutor 一、主程序 二、自定义周期线程池 三、自定义周期任务类 四、一个被执行的普通任务 五、执行结果ScheduledThr ... [详细]
  • 如何使用 `com.amazonaws.services.sqs.model.DeleteMessageRequest` 的 `getQueueUrl()` 方法及其代码示例解析 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • Java队列机制深度解析与应用指南
    Java队列机制在并发编程中扮演着重要角色。本文深入解析了Java队列的各种实现类及其应用场景,包括`LinkedList`、`ArrayBlockingQueue`和`PriorityQueue`等,并探讨了它们在高并发环境下的性能表现和适用场景。通过详细分析这些队列的内部机制和使用技巧,帮助开发者更好地理解和应用Java队列,提升系统的设计和架构能力。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • Go语言中Goroutine与通道机制及其异常处理深入解析
    在Go语言中,Goroutine可视为一种轻量级的并发执行单元,其资源消耗远低于传统线程,初始栈大小仅为2KB,而普通线程则通常需要几MB。此外,Goroutine的调度由Go运行时自动管理,能够高效地支持成千上万个并发任务。本文深入探讨了Goroutine的工作原理及其与通道(channel)的配合使用,特别是在异常处理方面的最佳实践,为开发者提供了一套完整的解决方案,以确保程序的稳定性和可靠性。 ... [详细]
  • 掌握并发编程的关键:深入解析三大核心挑战
    掌握并发编程的关键:深入解析三大核心挑战 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 本文详细介绍了 Ansible Ad-Hoc 命令的使用方法,基于官方文档进行了中文翻译。Ad-Hoc 命令允许用户通过 `usr/bin/ansible` 快速执行一次性任务,适用于快速部署、配置管理和故障排查等场景。文中通过多个实例演示了 Ad-Hoc 命令的具体应用,帮助读者更好地理解和掌握这一强大工具。 ... [详细]
  • 深入解析Java线程池原理及源码,助你轻松应对面试提问
    线程池作为一种高效的线程管理机制,在Java中扮演着重要角色。它通过预先创建并维护一定数量的线程,避免了频繁创建和销毁线程带来的性能开销,从而提高了应用程序的响应速度和系统稳定性。本文将深入探讨Java线程池的工作原理及其源码实现,帮助读者更好地理解和应用这一核心概念,为面试中的相关问题提供有力支持。 ... [详细]
  • 利用Java开发百度图片爬虫,实现高效下载功能
    为了满足大量图像素材的需求以支持机器学习项目,本文介绍了一种基于Java语言开发的百度图片爬虫工具,该工具能够高效地抓取并下载百度图片中的资源。文章首先展示了爬虫运行的效果图,并详细阐述了其工作原理和技术实现路径,重点解析了如何通过分析百度图片的网页结构来实现精准抓取。此外,还讨论了在实际应用中可能遇到的问题及解决方案。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 本研究基于状态空间方法,通过动态可视化技术实现了汉诺塔问题的求解过程,即将n个盘子从A柱移动到C柱。本文提供了一个使用C语言在控制台进行动画绘制的示例,并详细注释了程序逻辑,以帮助读者更好地理解和学习该算法。 ... [详细]
  • 基于灰度直方图的水果识别系统开发:MATLAB源代码及图形用户界面设计
    基于灰度直方图的水果识别系统开发:MATLAB源代码及图形用户界面设计 ... [详细]
author-avatar
航头党员之家
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有