热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot配置线程池拒绝策略的场景分析(妥善处理好溢出的任务)_java

本文通过实例代码给大家介绍下如何为线程池配置拒绝策略、如何自定义拒绝策略。对SpringBoot配置线程池拒绝策略的相关知识感

通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了用@Async创建异步任务、为异步任务配置线程池、使用多个线程池隔离不同的异步任务。今天这篇,我们继续对上面的知识进行完善和优化!

如果你已经看过上面几篇内容并已经掌握之后,一起来思考下面这个问题:

假设,线程池配置为核心线程数2、最大线程数2、缓冲队列长度2。此时,有5个异步任务同时开始,会发生什么?

场景重现

我们先来把上面的假设用代码实现一下:

第一步:创建Spring Boot应用,根据上面的假设写好线程池配置。

@EnableAsync
@SpringBootApplication
public class Chapter78Application {

    public static void main(String[] args) {
        SpringApplication.run(Chapter78Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(2);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("executor-1-");
            return executor;
        }

    }

}

第二步:用@Async注解实现一个部分任务

@Slf4j
@Component
public class AsyncTasks {

    public static Random random = new Random();

    @Async("taskExecutor1")
    public CompletableFuture doTaskOne(String taskNo) throws Exception {
        log.info("开始任务:{}", taskNo);
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
        return CompletableFuture.completedFuture("任务完成");
    }

}

第三步:编写测试用例

@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {

    @Autowired
    private AsyncTasks asyncTasks;

    @Test
    public void test2() throws Exception {
        // 线程池配置:core-2,max-2,queue=2,同时有5个任务,出现下面异常:
        // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
        // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9

        long start = System.currentTimeMillis();

        // 线程池1
        CompletableFuture task1 = asyncTasks.doTaskOne("1");
        CompletableFuture task2 = asyncTasks.doTaskOne("2");
        CompletableFuture task3 = asyncTasks.doTaskOne("3");
        CompletableFuture task4 = asyncTasks.doTaskOne("4");
        CompletableFuture task5 = asyncTasks.doTaskOne("5");

        // 一起执行
        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();

        long end = System.currentTimeMillis();

        log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }

}

执行一下,可以类似下面这样的日志信息:

2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始任务:2
2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-1] com.didispace.chapter78.AsyncTasks       : 开始任务:1

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732

 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
 at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
 at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
 at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
 at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
 at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
 at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne()
 at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
 at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
 at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
 at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
 at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at java.util.ArrayList.forEach(ArrayList.java:1255)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at java.util.ArrayList.forEach(ArrayList.java:1255)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
 at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
 at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
 at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
 at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
 at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
 at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
 at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
 at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
 ... 74 more

从异常信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 中,可以很明确的知道,第5个任务因为超过了执行线程+缓冲队列长度,而被拒绝了。

所有,默认情况下,线程池的拒绝策略是:当线程池队列满了,会丢弃这个任务,并抛出异常。

配置拒绝策略

虽然线程池有默认的拒绝策略,但实际开发过程中,有些业务场景,直接拒绝的策略往往并不适用,有时候我们可能会选择舍弃最早开始执行而未完成的任务、也可能会选择舍弃刚开始执行而未完成的任务等更贴近业务需要的策略。所以,为线程池配置其他拒绝策略或自定义拒绝策略是很常见的需求,那么这个要怎么实现呢?

下面就来具体说说今天的正题,如何为线程池配置拒绝策略、如何自定义拒绝策略。

看下面这段代码的最后一行,setRejectedExecutionHandler方法就是为线程池设置拒绝策略的方法:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//...其他线程池配置

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor中提供了4种线程的策略可以供开发者直接使用,你只需要像下面这样设置即可:

// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含义分别是:

  • AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
  • DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
  • DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
  • CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。

而如果你要自定义一个拒绝策略,那么可以这样写:

executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 拒绝策略的逻辑
    }
});

当然如果你喜欢用Lamba表达式,也可以这样写:

executor.setRejectedExecutionHandler((r, executor1) -> {
    // 拒绝策略的逻辑
});

好了,今天的学习就到这里!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-8工程:

  • Github:https://github.com/dyc87112/SpringBoot-Learning/
  • Gitee:https://gitee.com/didispace/SpringBoot-Learning/

推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 集合的遍历方式及其局限性
    本文介绍了Java中集合的遍历方式,重点介绍了for-each语句的用法和优势。同时指出了for-each语句无法引用数组或集合的索引的局限性。通过示例代码展示了for-each语句的使用方法,并提供了改写为for语句版本的方法。 ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • IhaveconfiguredanactionforaremotenotificationwhenitarrivestomyiOsapp.Iwanttwodiff ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文介绍了一种划分和计数油田地块的方法。根据给定的条件,通过遍历和DFS算法,将符合条件的地块标记为不符合条件的地块,并进行计数。同时,还介绍了如何判断点是否在给定范围内的方法。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Android JSON基础,音视频开发进阶指南目录
    Array里面的对象数据是有序的,json字符串最外层是方括号的,方括号:[]解析jsonArray代码try{json字符串最外层是 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
author-avatar
mb97ujp
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有