一、前言
上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。
二、Springboot引入定时任务核心配置
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documentedpublic @interfaceEnableScheduling {
}
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public classSchedulingConfiguration {
@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return newScheduledAnnotationBeanPostProcessor();
}
}
接下来主要看一下这个核心后置处理器:ScheduledAnnotationBeanPostProcessor 。
@Override
publicObject postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceofScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
returnbean;
}
Class> targetClass =AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup>) method -> {
Set scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if(annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if(logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " +targetClass);
}
}
else{
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->scheduledMethods.forEach(scheduled ->processScheduled(scheduled, method, bean)));
if(logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " +annotatedMethods);
}
}
}
returnbean;
}
1、处理Scheduled注解,通过ScheduledTaskRegistrar注册定时任务。
private voidfinishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);
}if (this.beanFactory instanceofListableBeanFactory) {
Map beans =((ListableBeanFactory)this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List configurers &#61; new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);for(SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}if (this.registrar.hasTasks() && this.registrar.getScheduler() &#61;&#61; null) {
Assert.state(this.beanFactory !&#61; null, "BeanFactory must be set to find scheduler by type");try{//Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}catch(NoUniqueBeanDefinitionException ex) {
logger.trace("Could not find unique TaskScheduler bean", ex);try{this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}catch(NoSuchBeanDefinitionException ex2) {if(logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " &#43;
"none is named &#39;taskScheduler&#39;. Mark one of them as primary or name it &#39;taskScheduler&#39; " &#43;
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " &#43;
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " &#43;ex.getBeanNamesFound());
}
}
}catch(NoSuchBeanDefinitionException ex) {
logger.trace("Could not find default TaskScheduler bean", ex);//Search for ScheduledExecutorService bean next...
try{this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}catch(NoUniqueBeanDefinitionException ex2) {
logger.trace("Could not find unique ScheduledExecutorService bean", ex2);try{this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}catch(NoSuchBeanDefinitionException ex3) {if(logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " &#43;
"none is named &#39;taskScheduler&#39;. Mark one of them as primary or name it &#39;taskScheduler&#39; " &#43;
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " &#43;
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " &#43;ex2.getBeanNamesFound());
}
}
}catch(NoSuchBeanDefinitionException ex2) {
logger.trace("Could not find default ScheduledExecutorService bean", ex2);//Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}this.registrar.afterPropertiesSet();
}
1、通过一系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。
2、向ScheduledTaskRegistrar注册一个TaskScheduler(用于对Runnable的任务进行调度&#xff0c;它包含有多种触发规则)。
3、registrar.afterPropertiesSet()&#xff0c;在这开始安排所有的定时任务开始执行了。
protected voidscheduleTasks() {if (this.taskScheduler &#61;&#61; null) {
this.localExecutor &#61; Executors.newSingleThreadScheduledExecutor();
this.taskScheduler &#61; new ConcurrentTaskScheduler(this.localExecutor);
}if (this.triggerTasks !&#61; null) {for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}if (this.cronTasks !&#61; null) {for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}if (this.fixedRateTasks !&#61; null) {for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}if (this.fixedDelayTasks !&#61; null) {for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
1、TriggerTask&#xff1a;动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下文确定下一个执行时间。
2、CronTask&#xff1a;动态定时任务&#xff0c;TriggerTask子类。通过cron表达式确定的时间触发下一个任务执行。
3、IntervalTask&#xff1a;一定时间延迟之后&#xff0c;周期性执行的任务。
4、taskScheduler 如果为空&#xff0c;默认是ConcurrentTaskScheduler&#xff0c;并使用默认单线程的ScheduledExecutor。
三、主要看一下CronTask工作原理
ScheduledTaskRegistrar.java
&#64;NullablepublicScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask&#61; this.unresolvedTasks.remove(task);boolean newTask &#61; false;if (scheduledTask &#61;&#61; null) {
scheduledTask&#61; newScheduledTask(task);
newTask&#61; true;
}if (this.taskScheduler !&#61; null) {
scheduledTask.future&#61; this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}else{
addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);
}return (newTask ? scheduledTask : null);
}
ConcurrentTaskScheduler.java
&#64;Override
&#64;Nullablepublic ScheduledFuture>schedule(Runnable task, Trigger trigger) {try{if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}else{
ErrorHandler errorHandler&#61;(this.errorHandler !&#61; null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}catch(RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" &#43; this.scheduledExecutor &#43; "] did not accept task: " &#43;task, ex);
}
}
ReschedulingRunnable.java
&#64;Nullablepublic ScheduledFuture>schedule() {synchronized (this.triggerContextMonitor) {this.scheduledExecutionTime &#61; this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime &#61;&#61; null) {return null;
}long initialDelay &#61; this.scheduledExecutionTime.getTime() -System.currentTimeMillis();this.currentFuture &#61; this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;
}
}private ScheduledFuture>obtainCurrentFuture() {
Assert.state(this.currentFuture !&#61; null, "No scheduled future");return this.currentFuture;
}
&#64;Overridepublic voidrun() {
Date actualExecutionTime&#61; newDate();super.run();
Date completionTime&#61; newDate();synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime !&#61; null, "No scheduled execution");this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!obtainCurrentFuture().isCancelled()) {
schedule();
}
}
}
1、最终将task和trigger都封装到了ReschedulingRunnable中。
2、ReschedulingRunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象&#xff0c;executor会调用run方法&#xff0c;run方法又调用了schedule方法)。
3、ReschedulingRunnable schedule方法加了同步锁&#xff0c;只能有一个线程拿到下次执行时间并加入执行器的调度。
4、不同的ReschedulingRunnable对象之间在线程池够用的情况下是不会相互影响的&#xff0c;也就是说满足线程池的条件下&#xff0c;TaskScheduler的schedule方法的多次调用是可以交叉执行的。
ScheduledThreadPoolExecutor.javapublic ScheduledFuture>schedule(Runnable command,longdelay,
TimeUnit unit) {if (command &#61;&#61; null || unit &#61;&#61; null)throw newNullPointerException();
RunnableScheduledFuture> t &#61;decorateTask(command,new ScheduledFutureTask(command, null,
triggerTime(delay, unit)));
delayedExecute(t);returnt;
}private void delayedExecute(RunnableScheduledFuture>task) {if(isShutdown())
reject(task);else{super.getQueue().add(task);if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))
task.cancel(false);elseensurePrestart();
}
}
ScheduledFutureTask 工作原理如下图所示【太懒了&#xff0c;不想画图了&#xff0c;盗图一张】。
1、ScheduledFutureTask会放入优先阻塞队列&#xff1a;ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆实现)
2、上图中的Thread对象即ThreadPoolExecutor.Worker&#xff0c;实现了Runnable接口
/*** Creates with given first task and thread from ThreadFactory.
*&#64;paramfirstTask the first task (null if none)*/Worker(Runnable firstTask) {
setState(-1); //inhibit interrupts until runWorker
this.firstTask &#61;firstTask;this.thread &#61; getThreadFactory().newThread(this);
}/**Delegates main run loop to outer runWorker*/
public voidrun() {
runWorker(this);
}
1、Worker中维护了Thread对象&#xff0c;Thread对象的Runnable实例即Worker自身
2、ThreadPoolExecutor#addWorker方法中会创建Worker对象&#xff0c;然后拿到Worker中的thread实例并start&#xff0c;这样就创建了线程池中的一个线程实例
3、Worker的run方法会调用ThreadPoolExecutor#runWorker方法&#xff0c;这才是任务最终被执行的地方&#xff0c;该方法示意如下
(1)首先取传入的task执行&#xff0c;如果task是null&#xff0c;只要该线程池处于运行状态&#xff0c;就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向 workQueue队列中offer任务。
getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束&#xff0c;返回的是null&#xff0c;runWorker循环结束&#xff0c;执行processWorkerExit方法。
至此&#xff0c;该线程结束自己的使命&#xff0c;从线程池中“消失”。
(2)在开始执行任务之前&#xff0c;会调用Worker的lock方法&#xff0c;目的是阻止task正在被执行的时候被interrupt&#xff0c;通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法)&#xff0c;该线程没有自己的interrupt set了。
(3)beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作&#xff0c;这两个方法是空的&#xff0c;留给继承类去填充功能。
我们可以在beforeExecute方法中抛出异常&#xff0c;这样task不会被执行&#xff0c;而且在跳出该循环的时候completedAbruptly的值是true&#xff0c;表示the worker died due to user exception&#xff0c;会用decrementWorkerCount调整wc。
(4)因为Runnable的run方法不能抛出Throwables异常&#xff0c;所以这里重新包装异常然后抛出&#xff0c;抛出的异常会使当当前线程死掉&#xff0c;可以在afterExecute中对异常做一些处理。
(5)afterExecute方法也可能抛出异常&#xff0c;也可能使当前线程死掉。
四、动态创建定时任务
TaskConfiguration 配置类
&#64;Configuration
&#64;EnableScheduling
&#64;Role(BeanDefinition.ROLE_INFRASTRUCTURE)public classTaskConfiguration {
&#64;Bean(name&#61;ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
&#64;Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, newDefaultThreadFactory());
}private static class DefaultThreadFactory implementsThreadFactory {private static final AtomicInteger poolNumber &#61; new AtomicInteger(1);private finalThreadGroup group;private final AtomicInteger threadNumber &#61; new AtomicInteger(1);private finalString namePrefix;
DefaultThreadFactory() {
SecurityManager s&#61;System.getSecurityManager();
group&#61; (s !&#61; null) ?s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix&#61; "pool-" &#43;poolNumber.getAndIncrement()&#43;
"-schedule-";
}
&#64;OverridepublicThread newThread(Runnable r) {
Thread t&#61; newThread(group, r,
namePrefix&#43;threadNumber.getAndIncrement(),0);if(t.isDaemon()) {
t.setDaemon(false);
}if (t.getPriority() !&#61;Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}returnt;
}
}
}
1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor&#xff0c;而是corePoolSize&#61;5的线程池
2、自定义线程池工厂类
DynamicTask 动态定时任务
&#64;Configurationpublic class DynamicTask implementsSchedulingConfigurer {private static Logger LOGGER &#61; LoggerFactory.getLogger(DynamicTask.class);private static final ExecutorService es &#61; new ThreadPoolExecutor(10, 20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(10),newDynamicTaskConsumeThreadFactory());private volatileScheduledTaskRegistrar registrar;private final ConcurrentHashMap> scheduledFutures &#61; new ConcurrentHashMap<>();private final ConcurrentHashMap cronTasks &#61; new ConcurrentHashMap<>();private volatile List taskConstants &#61;Lists.newArrayList();
&#64;Overridepublic voidconfigureTasks(ScheduledTaskRegistrar registrar) {this.registrar &#61;registrar;this.registrar.addTriggerTask(() ->{if (!CollectionUtils.isEmpty(taskConstants)) {
LOGGER.info("检测动态定时任务列表...");
List tts &#61; new ArrayList<>();
taskConstants
.forEach(taskConstant->{
TimingTask tt&#61; newTimingTask();
tt.setExpression(taskConstant.getCron());
tt.setTaskId("dynamic-task-" &#43;taskConstant.getTaskId());
tts.add(tt);
});this.refreshTasks(tts);
}
}
, triggerContext-> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
}public ListgetTaskConstants() {returntaskConstants;
}private void refreshTasks(Listtasks) {//取消已经删除的策略任务
Set taskIds &#61;scheduledFutures.keySet();for(String taskId : taskIds) {if (!exists(tasks, taskId)) {
scheduledFutures.get(taskId).cancel(false);
}
}for(TimingTask tt : tasks) {
String expression&#61;tt.getExpression();if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
LOGGER.error("定时任务DynamicTask cron表达式不合法: " &#43;expression);continue;
}//如果配置一致&#xff0c;则不需要重新创建定时任务
if(scheduledFutures.containsKey(tt.getTaskId())&&cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {continue;
}//如果策略执行时间发生了变化&#xff0c;则取消当前策略的任务
if(scheduledFutures.containsKey(tt.getTaskId())) {
scheduledFutures.remove(tt.getTaskId()).cancel(false);
cronTasks.remove(tt.getTaskId());
}
CronTask task&#61; new CronTask(tt, expression);
ScheduledFuture> future &#61;registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
scheduledFutures.put(tt.getTaskId(), future);
}
}private boolean exists(Listtasks, String taskId) {for(TimingTask task : tasks) {if(task.getTaskId().equals(taskId)) {return true;
}
}return false;
}
&#64;PreDestroypublic voiddestroy() {this.registrar.destroy();
}public static classTaskConstant {privateString cron;privateString taskId;publicString getCron() {returncron;
}public voidsetCron(String cron) {this.cron &#61;cron;
}publicString getTaskId() {returntaskId;
}public voidsetTaskId(String taskId) {this.taskId &#61;taskId;
}
}private class TimingTask implementsRunnable {privateString expression;privateString taskId;publicString getTaskId() {returntaskId;
}public voidsetTaskId(String taskId) {this.taskId &#61;taskId;
}
&#64;Overridepublic voidrun() {//设置队列大小10
LOGGER.error("当前CronTask: " &#43; this);
DynamicBlockingQueue queue&#61; new DynamicBlockingQueue(3);
es.submit(()->{while (!queue.isDone() || !queue.isEmpty()) {try{
String content&#61; queue.poll(500, TimeUnit.MILLISECONDS);if(StringUtils.isBlank(content)) {return;
}
LOGGER.info("DynamicBlockingQueue 消费&#xff1a;" &#43;content);
TimeUnit.MILLISECONDS.sleep(500);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
});//队列放入数据
for (int i &#61; 0; i <5; &#43;&#43;i) {try{
queue.put(String.valueOf(i));
LOGGER.info("DynamicBlockingQueue 生产&#xff1a;" &#43;i);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
queue.setDone(true);
}publicString getExpression() {returnexpression;
}public voidsetExpression(String expression) {this.expression &#61;expression;
}
&#64;OverridepublicString toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE
,false,false, TimingTask.class);
}
}/*** 队列消费线程工厂类*/
private static class DynamicTaskConsumeThreadFactory implementsThreadFactory {private static final AtomicInteger poolNumber &#61; new AtomicInteger(1);private finalThreadGroup group;private final AtomicInteger threadNumber &#61; new AtomicInteger(1);private finalString namePrefix;
DynamicTaskConsumeThreadFactory() {
SecurityManager s&#61;System.getSecurityManager();
group&#61; (s !&#61; null) ?s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix&#61; "pool-" &#43;poolNumber.getAndIncrement()&#43;
"-dynamic-task-";
}
&#64;OverridepublicThread newThread(Runnable r) {
Thread t&#61; newThread(group, r,
namePrefix&#43;threadNumber.getAndIncrement(),0);if(t.isDaemon()) {
t.setDaemon(false);
}if (t.getPriority() !&#61;Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}returnt;
}
}private static class DynamicBlockingQueue extends LinkedBlockingQueue{
DynamicBlockingQueue(intcapacity) {super(capacity);
}private volatile boolean done &#61; false;public booleanisDone() {returndone;
}public void setDone(booleandone) {this.done &#61;done;
}
}
}
1、taskConstants 动态任务列表
2、ScheduledTaskRegistrar#addTriggerTask 添加动态周期定时任务&#xff0c;检测动态任务列表的变化
CronTask task &#61; new CronTask(tt, expression);
ScheduledFuture> future &#61; registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
scheduledFutures.put(tt.getTaskId(), future);
3、动态创建cron定时任务&#xff0c;拿到ScheduledFuture实例并缓存起来
4、在刷新任务列表时&#xff0c;通过缓存的ScheduledFuture实例和CronTask实例&#xff0c;来决定是否取消、移除失效的动态定时任务。
DynamicTaskTest 动态定时任务测试类
&#64;RunWith(SpringRunner.class)
&#64;SpringBootTestpublic classDynamicTaskTest {
&#64;AutowiredprivateDynamicTask dynamicTask;
&#64;Testpublic void test() throwsInterruptedException {
List taskConstans &#61;dynamicTask.getTaskConstants();
DynamicTask.TaskConstant taskConstant&#61; newDynamicTask.TaskConstant();
taskConstant.setCron("0/5 * * * * ?");
taskConstant.setTaskId("test1");
taskConstans.add(taskConstant);
DynamicTask.TaskConstant taskConstant1&#61; newDynamicTask.TaskConstant();
taskConstant1.setCron("0/5 * * * * ?");
taskConstant1.setTaskId("test2");
taskConstans.add(taskConstant1);
DynamicTask.TaskConstant taskConstant2&#61; newDynamicTask.TaskConstant();
taskConstant2.setCron("0/5 * * * * ?");
taskConstant2.setTaskId("test3");
taskConstans.add(taskConstant2);
TimeUnit.SECONDS.sleep(40);//移除并添加新的配置
taskConstans.remove(taskConstans.size() - 1);
DynamicTask.TaskConstant taskConstant3&#61; newDynamicTask.TaskConstant();
taskConstant3.setCron("0/5 * * * * ?");
taskConstant3.setTaskId("test4");
taskConstans.add(taskConstant3);// TimeUnit.MINUTES.sleep(50);
}
}