多线程异步任务的事务协调与管理
作者:mobiledu2502891413 | 来源:互联网 | 2024-12-22 19:11
本文介绍了如何在多线程环境中实现异步任务的事务控制,确保任务执行的一致性和可靠性。通过使用计数器和异常标记字段,系统能够准确判断所有异步线程的执行结果,并根据结果决定是否回滚或提交事务。
为了在多线程环境下有效管理和协调异步任务的事务,我们设计了一个名为 `ThreadTransaction` 的类。该类利用了 Java 并发工具包(JUC)中的线程池以及 Spring 框架提供的事务管理功能,确保任务执行的原子性和一致性。 ### 类结构及功能 - **日志记录**:通过 `LoggerFactory` 获取日志记录器,用于跟踪任务执行过程中的关键事件。 - **事务管理**:依赖于 `PlatformTransactionManager` 实现事务控制,确保每个任务的执行都在独立的事务中进行。 - **线程池配置**:支持两种类型的线程池——标准 JUC 线程池 (`ThreadPoolExecutor`) 和 Spring 提供的线程池 (`ThreadPoolTaskExecutor`)。 - **任务列表**:使用 `List` 存储待执行的任务,确保任务可以动态添加和管理。 - **执行计数器**:通过 `CountDownLatch` 实现对并发任务的同步控制,确保所有任务完成后才能进行最终的事务决策。 - **异常标记**:使用 `AtomicReference` 来追踪任务执行过程中是否有异常发生。 ### 构造方法 `ThreadTransaction` 提供了多种构造方法,以适应不同的线程池配置需求: 1. 使用 `PlatformTransactionManager` 和 `ThreadPoolTaskExecutor` 初始化。 2. 使用 `PlatformTransactionManager` 和 `ThreadPoolExecutor` 初始化。 3. 支持指定初始任务数量的构造方法。 ### 核心方法 - **addFunction(Supplier supplier)**:将需要异步执行的任务添加到任务列表中。 - **execute()**:启动所有异步任务并等待它们完成。根据异常标记字段的结果,决定是提交还是回滚事务。 #### 内部类 TransactionRunnable `TransactionRunnable` 是一个实现了 `Runnable` 接口的内部类,负责具体任务的执行和事务管理。它会在每个任务开始时创建一个新的事务,并在任务完成后根据全局异常标志决定事务的提交或回滚。 ```java public class ThreadTransaction { private final Logger LOG = LoggerFactory.getLogger(ThreadTransaction.class); private PlatformTransactionManager platformTransactionManager; private ThreadPoolExecutor threadPoolExecutor; private ThreadPoolTaskExecutor threadPoolTaskExecutor; private List supplierList = new ArrayList<>(); private volatile CountDownLatch countDownLatch; private AtomicReference isError = new AtomicReference<>(false); public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolTaskExecutor threadPoolTaskExecutor) { this.platformTransactiOnManager= platformTransactionManager; this.threadPoolTaskExecutor = threadPoolTaskExecutor; } public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolExecutor threadPoolExecutor) { this.platformTransactiOnManager= platformTransactionManager; this.threadPoolExecutor = threadPoolExecutor; } public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolTaskExecutor threadPoolTaskExecutor, int size) { this(platformTransactionManager, threadPoolTaskExecutor); supplierList = new ArrayList<>(size); } public boolean addFunction(Supplier supplier) { return supplierList.add(supplier); } public void execute() { LOG.info("多线程事务开始..."); countDownLatch = new CountDownLatch(supplierList.size()); for (Supplier supplier : supplierList) { this.threadPoolTaskExecutor.submit(new TransactionRunnable(platformTransactionManager, supplier)); } try { countDownLatch.await(); if (isError.get()) { LOG.error("多线程执行失败,事务已回滚!"); throw new RuntimeException("多线程执行失败!"); } LOG.info("多线程执行成功,事务已提交!"); } catch (Exception e) { LOG.error("多线程执行失败:" + e.getMessage()); e.printStackTrace(); } } class TransactionRunnable implements Runnable { private PlatformTransactionManager platformTransactionManager; private Supplier supplier; public TransactionRunnable(PlatformTransactionManager platformTransactionManager, Supplier supplier) { this.platformTransactiOnManager= platformTransactionManager; this.supplier = supplier; } @Override public void run() { DefaultTransactionDefinition defaultTransactiOnDefinition= new DefaultTransactionDefinition(); defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus transaction = this.platformTransactionManager.getTransaction(defaultTransactionDefinition); try { this.supplier.get(); } catch (Exception e) { isError.set(true); LOG.error("多线程事务执行失败: {}", e.getMessage()); e.printStackTrace(); } countDownLatch.countDown(); try { if (isError.get()) { LOG.info("多线程事务(子线程)回滚"); platformTransactionManager.rollback(transaction); } else { LOG.info("多线程事务(子线程)提交"); platformTransactionManager.commit(transaction); } } catch (Exception e) { e.printStackTrace(); } } } } ``` 通过这种方式,`ThreadTransaction` 类能够在多线程环境下有效地管理异步任务的事务,确保数据一致性和系统的稳定性。
推荐阅读
本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ...
[详细]
蜡笔小新 2024-12-21 17:33:44
本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ...
[详细]
蜡笔小新 2024-12-22 16:20:33
在尝试使用C# Windows Forms客户端通过SignalR连接到ASP.NET服务器时,遇到了内部服务器错误(500)。本文将详细探讨问题的原因及解决方案。 ...
[详细]
蜡笔小新 2024-12-21 16:55:52
本文介绍如何在Spring Boot项目中集成Redis,并通过具体案例展示其配置和使用方法。包括添加依赖、配置连接信息、自定义序列化方式以及实现仓储接口。 ...
[详细]
蜡笔小新 2024-12-22 17:35:23
蜡笔小新 2024-12-20 20:43:56
本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ...
[详细]
蜡笔小新 2024-12-22 20:38:12
一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ...
[详细]
蜡笔小新 2024-12-22 20:24:15
本教程详细介绍了如何下载并安装 Android Studio,包括设置 SDK 路径和优化启动性能的方法。通过这些步骤,您可以顺利地开始开发 Android 应用。 ...
[详细]
蜡笔小新 2024-12-22 18:01:31
本实验旨在通过配置 Nginx 实现反向代理和负载均衡,确保从北京本地代理服务器访问上海的 Web 服务器时,能够依次显示红、黄、绿三种颜色页面以验证负载均衡效果。 ...
[详细]
蜡笔小新 2024-12-22 15:15:48
本文探讨了如何通过预处理器开关选择不同的类实现,并解决在特定情况下遇到的链接器错误。 ...
[详细]
蜡笔小新 2024-12-22 12:03:31
本文深入探讨了面向切面编程(AOP)的概念及其在Spring框架中的应用。通过详细解释AOP的核心术语和实现机制,帮助读者理解如何利用AOP提高代码的可维护性和开发效率。 ...
[详细]
蜡笔小新 2024-12-22 10:59:43
在高并发需求的C++项目中,我们最初选择了JsonCpp进行JSON解析和序列化。然而,在处理大数据量时,JsonCpp频繁抛出异常,尤其是在多线程环境下问题更为突出。通过分析发现,旧版本的JsonCpp存在多线程安全性和性能瓶颈。经过评估,我们最终选择了RapidJSON作为替代方案,并实现了显著的性能提升。 ...
[详细]
蜡笔小新 2024-12-21 18:13:59
This post discusses an issue encountered while using the @name annotation in documentation generation, specifically regarding nested class processing and unexpected output. ...
[详细]
蜡笔小新 2024-12-21 13:50:08
本文介绍了一种使用Selenium和ChromeDriver结合Python代码,轻松实现对豆瓣网站进行完整页面截图的方法。该方法不仅简单易行,而且解决了新版Selenium不再支持PhantomJS的问题。 ...
[详细]
蜡笔小新 2024-12-22 15:17:55
12月16日JavaScript变量、函数、流程、循环等***线上九期班 ...
[详细]
蜡笔小新 2024-12-22 12:11:30
mobiledu2502891413
这个家伙很懒,什么也没留下!