热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:任务调度(02)SpringSchedule

篇首语:本文由编程笔记#小编为大家整理,主要介绍了任务调度(02)SpringSchedule相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了任务调度(02)Spring Schedule相关的知识,希望对你有一定的参考价值。



任务调度(02)Spring Schedule
目录
  • 任务调度(02)Spring Schedule
    • 1. 基本用法
      • 1.1 定时调度 - 基于xml
      • 1.2 定时调度 - 注解驱动
      • 1.3 异步调度 - 注解驱动
    • 2. @EnableScheduling 源码分析
      • 2.1 @Scheduled 注解扫描
      • 2.2 定时任务触发
      • 2.3 TaskScheduler
        • 2.3.1 自定义 TaskScheduler
        • 2.3.2 TaskScheduler接口
        • 2.3.3 TaskScheduler实现类
    • 3. @EnableAsync 源码分析
      • 3.1 @EnableAsync
      • 3.2 @Async
      • 3.3 执行流程
        • 3.3.1 初始化流程
        • 3.3.2 AsyncAnnotationBeanPostProcessor
        • 3.3.3 AsyncAnnotationAdvisor
        • 3.3.4 执行流程

Spring 3.0 提供两种任务调度方式:一是定时任务调度;二是异步任务调度。这两种任务调度方式都是基于 JUC 实现的,是一种非常轻量级的任务调度方式。同时在 spring-context-support 中也整合了 Quartz,文本重点关注 Spring 提供了原生任务调度方式 - @EnableScheduling 和 @EnableAsync。



  1. 定时任务调度和异步任务调度的基本使用方法。

  2. 定时任务调度的源码分析。ScheduledAnnotationBeanPostProcessor 会扫描标注有 @Scheduled 注解的方法,将其注册到 ScheduledTaskRegistrar 中,真正调度任务的类是 ScheduledTaskRegistrar 中的TaskScheduler。TaskScheduler 实现类最底层是 ScheduledExecutorService,由 JUC 提供。

  3. 异步任务调度的源码分析。异步执行和定时执行的最大区别是异步执行可能需要返回执行结果,所以需要对标注有 @Async 的类进行代理。异步任务有两种执行模式:代理 或 Aspectj,无论那种方法都是以 Spring AOP 作为支撑。AsyncAnnotationBeanPostProcessor 首先创建 AsyncAnnotationAdvisor,如果类或方法上标注有 @Async 注解,则使用 proxyFactory 生成代理对象。


1. 基本用法


1.1 定时调度 - 基于xml


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">













总结: Spring 任务调度总共有 4 个标签:



  1. task:scheduler:配置调度器,实现了 org.springframework.scheduling.TaskScheduler 接口,默认是 org.springframework.scheduling.TaskScheduler。

  2. task:scheduled-tasks:配置任务,如果不指定 scheduler 属性,那么所有的任务都会在一个线程上执行,可能会造成线程阻塞。

  3. task:executor:异步任务调度的执行线程,实现 Executor 接口。

  4. task:annotation-driven:开启注解驱动,包括 @Scheduled 和 @Async。其中 executor 为异步调度的执行线程池,scheduler 为定时调度的执行线程池。同样不配置的话,所有的 @Scheduled 任务都是一个线程上执行。


1.2 定时调度 - 注解驱动

@EnableScheduling
public class ScheduledTest {
private Logger logger = LoggerFactory.getLogger(ScheduledTest.class);
public static void main(String[] args) {
SpringApplication.run(ScheduledTest.class);
}
@Scheduled(fixedDelay = 2000)
public void fixedDelay() {
logger.info("fixedDelay");
}
@Scheduled(fixedRate = 2000)
public void fixedRate() {
logger.info("fixedRate");
}
@Scheduled(cron = "0/2 * * * * *")
public void cron() {
logger.info("cron");
}
}

总结: fixedDelay、fixedRate、cron 都是定时的任务调度,那它们有什么区别呢?



  • fixedDelay:上一个任务执行结束和下一个任务开始的时间间隔。总时间间隔=方法执行时间 + fixedDelay。

  • fixedRate:上一个任务执行开始和下一个任务开始的时间间隔。

  • cron:类似 Linux crontab,如果任务执行时间太长,就会忽略过期的任务。


1.3 异步调度 - 注解驱动

@Async 可以标注在类或方法上,如果标注在类上则表示这个类的所有方法都是异步执行的。

@EnableAsync
//@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncTest.class);
public static void main(String[] args) {
ConfigurableApplicationContext cOntext= SpringApplication.run(AsyncTest.class);
AsyncTest asyncTest = context.getBean(AsyncTest.class);
asyncTest.async();
logger.info("main method");
}
@Async
public void async() {
logger.info("async method");
}
}

日志如下:

INFO 2516 --- [ main] AsyncTest : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest : async method

总结: 可以发现这两条日志输出的线程不一致,而且是 main 线程先输出日志,也就是 @Async 注解后,方法是异步执行的。

注意: @Async 标注在类上,则这类的所有方法都是异常执行。

接下来进行源码分析。Spring 任务调度的源码位于 spring-context 工程的 scheduling 包中,其目录如下:

scheduling
|- annotation # @Scheduled和@Async注解支持
|- concurrent # juc扩展
|- config # 传统的xml解析支持
|- support # cron解析支持

2. @EnableScheduling 源码分析


Spring 定时调度流程图

sequenceDiagram
@EnableScheduling ->> ScheduledAnnotationBeanPostProcessor : @Import
alt 步骤1:postProcessAfterInitialization
ScheduledAnnotationBeanPostProcessor ->> @Scheduled : selectMethods
ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleCronTask
ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleFixedDelayTask
ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleFixedRateTask
end
alt 步骤2:onApplicationEvent
ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : setTaskScheduler
ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleTasks
end

总结: 上述的步骤,有两个重要的入口:



  1. 实现 BeanPostProcessor#postProcessAfterInitialization 接口,在每个 bean 实例化完成后,扫描 @Scheduled 注解,注册到 ScheduledTaskRegistrar 中。

  2. 监听 Spring 的容器初始化事件 onApplicationEvent(ContextRefreshedEvent event) 事件,回调 finishRegistration 方法设置 ScheduledTaskRegistrar 的 TaskScheduler。在配置 ScheduledTaskRegistrar 后,调用其 scheduleTasks 方法触发任务调度。

  3. 真正调度任务的类是 TaskScheduler 实现类中的 ScheduledExecutorService,由 JUC 提供。

  4. 查找 TaskScheduler 的实现类实例默认是通过类型查找,若有多个实现则会查找名字为 "taskScheduler" 的实现 Bean,若没有找到则在 ScheduledTaskRegistrar 调度任务的时候会创建一个 newSingleThreadScheduledExecutor , 将 TaskScheduler 的实现类实例设置到 ScheduledTaskRegistrar 属性中。


2.1 @Scheduled 注解扫描

@Scheduled 注解的处理分两步:一是 MethodIntrospector.selectMethods 扫描该 bean 的所有 @@Scheduled和@Schedules注解的方法;二是 processScheduled 方法处理有 @Scheduled 注解的方法。

(1)@Scheduled 扫描

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
return bean;
}
Class targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
// 1. 扫描每个方法上的@Scheduled和@Schedules注解
Map> annotatedMethods =
MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup>)
method -> {
Set scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// 2. 调用processScheduled处理所有的@Scheduled注解的方法
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
} else {
annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled ->
processScheduled(scheduled, method, bean)));
}
}
return bean;
}

总结: 核心步骤就两步:第一步是 MethodIntrospector.selectMethods 扫描所有方法的 @Scheduled 和@Schedules注解。至于第二步,最核心的处理则委托给了 processScheduled 方法处理。

(2)processScheduled

processScheduled 分别处理 @Scheduled 中的三种情况:cron、fixedDelay、fixedRate 三种场景,将其分别包装成 CronTask、FixedDelayTask、FixedRateTask 后,注册到 ScheduledTaskRegistrar 上,当调用其 scheduleTasks 方法触发任务调度。

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
Set tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
...
// 1. Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zOne= scheduled.zone();
if (StringUtils.hasLength(cron)) {
if (!Scheduled.CRON_DISABLED.equals(cron)) {
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// 2. Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
// 3. Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}

// 4. Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
} catch (IllegalArgumentException ex) {
throw new IllegalStateException();
}
}

总结: processScheduled 代码有删减,只保留了核心的逻辑。processScheduled 代码很简单,接下来继续分析定时任务触发流程。


2.2 定时任务触发

ScheduledAnnotationBeanPostProcessor 会通过 onApplicationEvent(ContextRefreshedEvent event) 监听容器初始化事件。一旦容器初始化完成,就会试图从容器中获取 TaskScheduler。最终调用 scheduleTasks 触发定时任务调度。

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
private void finishRegistration() {
// 1. 手动配置 TaskScheduler
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}

// 2. 自定义配置ScheduledTaskRegistrar。获取容器中的SchedulingConfigurer
if (this.beanFactory instanceof ListableBeanFactory) {
Map beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List cOnfigurers= new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 3. 如果没有配置TaskScheduler,则从Spring容器中查找。查找规则如下:
// 1) 查找TaskScheduler,如果有多个则查找名称为"taskScheduler"
// 2) 查找ScheduledExecutorService,如果有多个则查找名称为"taskScheduler"
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
} catch (NoUniqueBeanDefinitionException ex) {
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
} catch (NoSuchBeanDefinitionException ex2) {
}
} catch (NoSuchBeanDefinitionException ex) {
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
} catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
} catch (NoSuchBeanDefinitionException ex3) {
}
} catch (NoSuchBeanDefinitionException ex2) {
}
}
}
// 4. 触发定时任务调度
this.registrar.afterPropertiesSet();
}

总结: 一旦 Spring 容器初始化完成,就会触发定时任务的执行。在 finishRegistration 方法中,主要是配置定时任务执行器,并启动定时任务。



  1. TaskScheduler 配置:手动配置scheduler -> TaskScheduler -> TaskScheduler("taskScheduler") -> ScheduledExecutorService -> ScheduledExecutorService("taskScheduler")

  2. ScheduledTaskRegistrar 配置:自定义 SchedulingConfigurer。

  3. 触发定时任务调度:调用 ScheduledTaskRegistrar#scheduleTasks。

  4. TaskScheduler 是定时任务的最终执行器,底层的实现类是 ScheduledExecutorService,由 JUC 提供。可以看到所有的 @Scheduled 注解的定时任务都会在同一个 TaskScheduler 中执行,不像 xml 一样可以指定不同的 TaskScheduler。所以使用时要注意方法是否会阻塞,或将配置合适的 TaskScheduler 线程池大小。


2.3 TaskScheduler

TaskScheduler 是定时任务的最终执行器,底层的实现类是 ScheduledExecutorService,由 JUC 提供。


2.3.1 自定义 TaskScheduler

Spring Boot 自定义 TaskScheduler 如下:

@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数10:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
// 缓冲队列200:用来缓冲执行任务的队列
executor.setQueueCapacity(200);
// 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("taskExecutor-");
// 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,
// 当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;
// 如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

2.3.2 TaskScheduler接口


图2 TaskScheduler接口

(1)schedule(Runnable task, Trigger trigger);

指定一个触发器执行定时任务。可以使用 CronTrigger 来指定 Cron 表达式,执行定时任务

CronTrigger t = new CronTrigger("0 0 10,14,16 * * ?");
taskScheduler.schedule(this, t);

(2)schedule(Runnable task, Date startTime);

指定一个具体时间点执行定时任务,可以动态的指定时间,开启任务。只执行一次。

(3)scheduleAtFixedRate(Runnable task, long period);

立即执行,循环任务,指定一个执行周期(毫秒计时)。

PS: 不管上一个周期是否执行完,到时间下个周期就开始执行。

(4)scheduleAtFixedRate(Runnable task, Date startTime, long period);

指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)。

PS: 不管上一个周期是否执行完,到时间下个周期就开始执行。

(5)scheduleWithFixedDelay(Runnable task, long delay);

立即执行,循环任务,指定一个间隔周期(毫秒计时)。

PS: 上一个周期执行完,等待delay时间,下个周期开始执行。

(6)scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)


2.3.3 TaskScheduler实现类


图2 TaskScheduler实现类

TaskScheduler 有三个实现类。

(1)ConcurrentTaskScheduler

ConcurrentTaskScheduler 如果不设置 ScheduledExecutorService,则默认通过 Executors.newSingleThreadScheduledExecutor() 创建单线程池执行。

private ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler();
taskScheduler.schedule(task, new Date());

(2)DefaultManagedTaskScheduler

以当前线程执行任务,这是 ConcurrentTaskScheduler 的子类,添加了 JNDI 的支持。和ConcurrentTaskScheduler 一样的用法,需要使用 JNDI 可以单独设置

(3)ThreadPoolTaskScheduler

TaskScheduler 接口的默认实现类,多线程定时任务执行。可以设置执行线程池数(默认一个线程)。


3. @EnableAsync 源码分析

Spring 异步任务调度,使用了动态代理( JDK 或 CGLIB),依赖 Spring Aop,支持动态代理或 ASPECTJ 两种模式。


3.1 @EnableAsync

@EnableAsync 是注解驱动的入口,用于配置任务调度的模式。

@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
}

总结: @EnableAsync 注解有两个重要的属性:



  • mode:表示使用动态代理(PROXY时使用 JDK 或 CGLIB 代理)还是 ASPECTJ 方式(AdviceMode.ASPECTJ)。默认使用 AdviceMode.PROXY,也就是动态代理。如果使用 AdviceMode.ASPECTJ 模式需要引入 spring-aspectj 包。

  • proxyTargetClass :表示使用 JDK 代理(false)还是CGLIB 代理(true)。默认使用 false,也就是 JDK 动态代理。

  • annotation:表示自定义注解,默认 @Async。


3.2 @Async

@Async 可以标注在类或方法上,如果在类上,则表示这个类的所有方法都异步执行。

@Async("myExecuter")
public void async() {
logger.info("async method");
}

总结: 和 @Scheduled 不同,@Async 可以指定执行的线程池,默认是执行器是 SimpleAsyncTaskExecutor,即每次执行时都会创建一个新的线程执行任务。可以从日志中看出不配置执行线程池的线程名称(线程名SimpleAsyncTaskExecutor,日志输出时线程名称有截断):

INFO 2516 --- [ main] AsyncTest : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest : async method

3.3 执行流程

@EnableAsync 通过 @Import 向 Spring 容器中注入 AsyncConfigurationSelector。上文提到 @EnableAsync 有两种执行模式:动态代理或 Aspectj,AsyncConfigurationSelector 会根据配置动态注入 ProxyAsyncConfiguration 或 AspectJAsyncConfiguration(需要引入 spring-aspectj 包)。

public class AsyncConfigurationSelector extends AdviceModeImportSelector {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}

总结: Spring 默认为 PROXY 代理模式,即使用 ProxyAsyncConfiguration。下文的源码分析也是从 ProxyAsyncConfiguration 展开。


3.3.1 初始化流程


图2 ProxyAsyncConfiguration 初始化流程

sequenceDiagram
participant ProxyAsyncConfiguration
participant AsyncConfigurer
participant AsyncAnnotationBeanPostProcessor
participant AsyncAnnotationAdvisor
participant AbstractAdvisingBeanPostProcessor
participant ProxyFactory
ProxyAsyncConfiguration ->> AsyncConfigurer : executor/exceptionHandler
ProxyAsyncConfiguration ->> AsyncAnnotationBeanPostProcessor : @Bean
AsyncAnnotationBeanPostProcessor ->> AsyncAnnotationAdvisor: setBeanFactory
AsyncAnnotationBeanPostProcessor ->> AbstractAdvisingBeanPostProcessor : postProcessAfterInitialization
AbstractAdvisingBeanPostProcessor ->> ProxyFactory : prepareProxyFactory
AbstractAdvisingBeanPostProcessor ->> ProxyFactory : addAdvisor
AbstractAdvisingBeanPostProcessor ->> ProxyFactory : getProxy
ProxyFactory -->> AsyncAnnotationBeanPostProcessor : proxy

总结: Spring 异步任务调度最核心的步骤是生成切面 AsyncAnnotationAdvisor,至于其它的执行逻辑则是依靠 Spring AOP 实现的。主要的核心类如下:



  1. ProxyAsyncConfiguration:配置类,向 Spring 容器中注入 AsyncConfigurer 和 AsyncAnnotationBeanPostProcessor。

  2. AsyncConfigurer:可以自定义异步执行的线程线程池(executor)和异常(exceptionHandler)处理机制。使用时,只需要将实现了 AsyncConfigurer 接口的类注入到容器中即可。

  3. AsyncAnnotationBeanPostProcessor:最核心类,主要完成两件事:一是 AsyncAnnotationBeanPostProcessor 初始化时,会执行 setBeanFactory 方法初始化 AsyncAnnotationAdvisor,二是在 bean 初始化完成后生成代理类。Spring 异步任务调度实际上是依赖 Spring AOP 机制。

  4. AsyncAnnotationAdvisor:最核心类,包含 切入点(Pointcut)通知(Advice)

  5. AbstractAdvisingBeanPostProcessor(Spring AOP):在 bean 实现完成后,根据 ProxyFactory 生成代理类。

  6. ProxyFactory(Spring AOP):代理工厂。有两种代理方式:JdkDynamicAopProxy 或 ObjenesisCglibAopProxy。


3.3.2 AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor 主要的工作是初始化 AsyncAnnotationAdvisor,这是 Spring 异步执行的基石。AsyncAnnotationAdvisor 相当于 Spring AOP 的 @Aspect 注解,定义了切入点(Pointcut)和通知(Advice)。

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}

总结: Spring 异步任务调度除了 AsyncAnnotationAdvisor 不一样,其余的执行流程和 Spring AOP 一模一样。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
return bean;
}
...
// 生成动态代理
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
return bean; // No proxy needed.
}

总结: 首先判断是否需要进行动态代理,如果类或方法上标注有 @Async 注解,则使用 proxyFactory 生成代理对象。

protected boolean isEligible(Class targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
// 最终使用 advisor.putcut 判断是否需要进行代理
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}

3.3.3 AsyncAnnotationAdvisor

AsyncAnnotationAdvisor 主要是定义了 AOP 执行的通知(Advice)和切入点(Pointcut)。

(1)构造器

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;

public AsyncAnnotationAdvisor(
Supplier executor, Supplier exceptionHandler) {
Set> asyncAnnotatiOnTypes= new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
} catch (ClassNotFoundException ex) {
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
}

总结: 从上也可以看出 Spring 除了支持 @Async 注解外,还支持 @Asynchronous 注解。另外就是创建通知(advice)和切面(pointcut)。

其实我们也大致可以猜到,AsyncAnnotationAdvisor 实际上将标注有 @Async 注解的方法丢到线程池中异步执行。pointcut 根据注解判断是否需要异步执行,advice 则是具体的执行逻辑。

(2)buildPointcut

buildPointcut 建立切入点,也就是判断是否需要异步执行。

protected Pointcut buildPointcut(Set> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class asyncAnnotationType : asyncAnnotationTypes) {
// 匹配类上的注解
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// 匹配方法上的注解
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
} else {
// union:表示所有的匹配规则只要有一个匹配上了就返回匹配成功
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}

总结: buildPointcut 建立匹配规则,实际上就是只要类或方法上有一个匹配成功,就执行任务调度。

(3)buildAdvice

buildAdvice 方法包含异步执行的逻辑。

protected Advice buildAdvice(
Supplier executor, Supplier exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

总结: buildAdvice 方法很简单,将异步执行的逻辑全部委托给了 AnnotationAsyncExecutionInterceptor 完成。

AnnotationAsyncExecutionInterceptor 继承自 AsyncExecutionInterceptor(Spring AOP),具体的执行逻辑由 Spring AOP 完成。至此容器初始化时,代理已经生成完毕。


3.3.4 执行流程

Spring 异步任务执行的入口是 AnnotationAsyncExecutionInterceptor,继承自 AsyncExecutionInterceptor。在 Spring AOP 执行时会回调其 invoke 方法。

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// `1. 获取最原始的方法 userDeclaredMethod,去除代理,桥接
Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 2. 根据方法获取任务执行的线程池,@Async可以指定要执行的线程池名称
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(");
}
// 3. 任务封装成 Callable
Callable task = () -> {
try {
// 执行任务
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future) result).get();
}
} catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

// 4. 提交任务
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

总结: invoke 方法首先要获取要异步执行的线程池,将任务封装成一个 Callable 后,丢到该线程池中执行。



  1. 获取任务执行的线程池:determineAsyncExecutor

  2. 任务执行并返回执行结果:doSubmit

(1)determineAsyncExecutor

determineAsyncExecutor 方法用于获取任务将要在哪个线程池上执行,默认是 SimpleAsyncTaskExecutor。

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 1. 缓存中获取
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 2. @Async 注解指定执行线程池名称
String qualifier = getExecutorQualifier(method);
// 3. 如果指定了线程池名称,直接从 Spring 容器中获取
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
// 4. 获取默认的线程池
} else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
// 5. 包装成 AsyncListenableTaskExecutor
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}

总结: determineAsyncExecutor 的逻辑很清晰,如果指定了线程池名称,则直接从 Spring 容器中获取,如果没有指定,则获取默认的线程池。最后将其包装成 AsyncListenableTaskExecutor。

至于默认线程池的设置也很简单,如果 Spring 容器中设置了实现 TaskExecutor 接口的 bean,则直接返回;如果有多个,则返回名称为 ”taskExecutor“ 的 TaskExecutor ;如果容器中没有,则返回默认的 SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor 非常简单,每次执行时都会创建一个新的线程执行任务。代码如下:

// AsyncExecutionInterceptor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 1. 从Spring容器获取TaskExecutor类型的bean,如果有多个,则选择名称为"taskExecutor"
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
// 2. 如果没有定义,则选择默认的SimpleAsyncTaskExecutor。
// SimpleAsyncTaskExecutor很简单,每次执行时,都会创建一个新的线程执行任务
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

(2)doSubmit

doSubmit 处理结果的异步返回。

protected Object doSubmit(Callable task, AsyncTaskExecutor executor, Class returnType) {
// 1. 异步执行。jdk CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
// 2. 结果回调。ListenableFuture
} else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
// 3. 同步返回。jdk Future
} else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
// 4. 不返回任务结果
} else {
executor.submit(task);
return null;
}
}


每天用心记录一点点。内容也许不重要,但习惯很重要!



推荐阅读
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • 本文介绍了使用kotlin实现动画效果的方法,包括上下移动、放大缩小、旋转等功能。通过代码示例演示了如何使用ObjectAnimator和AnimatorSet来实现动画效果,并提供了实现抖动效果的代码。同时还介绍了如何使用translationY和translationX来实现上下和左右移动的效果。最后还提供了一个anim_small.xml文件的代码示例,可以用来实现放大缩小的效果。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 网络请求模块选择——axios框架的基本使用和封装
    本文介绍了选择网络请求模块axios的原因,以及axios框架的基本使用和封装方法。包括发送并发请求的演示,全局配置的设置,创建axios实例的方法,拦截器的使用,以及如何封装和请求响应劫持等内容。 ... [详细]
  • 本文讨论了如何使用Web.Config进行自定义配置节的配置转换。作者提到,他将msbuild设置为详细模式,但转换却忽略了带有替换转换的自定义部分的存在。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
author-avatar
完颜777_870
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有