热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

atomikosJTA/XA全局事务

文章转载于:Atomikos公司官方网址为:https:www.atomikos.com。其旗下最著名的产品就是事务管理器。产品分两个版本:

文章转: 

logo

FDDADEB1-C462-492D-B243-0BCEC7AB66B1.png

    Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:

TransactionEssentials:开源的免费产品

ExtremeTransactions:上商业版,需要收费。

这两个产品的关系如下图所示: 

CF404B96-1F5C-4969-9C7F-21A83C4C54FA.png

TransactionEssentials:

1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

    UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

    TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

    Transaction实现是com.atomikos.icatch.jta.TransactionImp

2、针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

     在上一节我们讲解JTA规范时,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器(TM),并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。典型的XADataSource实现包括:

    1、mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

    2、阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource

    3、tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource

    而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常: 

com.atomikos.jdbc.AtomikosSQLException: The class 'com.mchange.v2.c3p0.ComboPooledDataSource'specified by property 'xaDataSourceClassName' does not implement the required interface javax.jdbc.XADataSource. Please make sure the spelling is correct, and check your JDBC driver vendor's documentation.

ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:

支持TCC:这是一种柔性事务

支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。

本文主要针对Atomikos开源版本的事务管理器实现TransactionEssentials进行讲解,包括:

1、直接使用TransactionEssentials的API

2、TransactionEssentials与spring、mybatis整合

3、Atomikos配置详解

 

直接使用TransactionEssentials的API

在maven项目的pom文件中引入以下依赖:

com.atomikostransactions-jdbc4.0.6

mysqlmysql-connector-java5.1.39

新建mysql数据库表

需要注意的是,在mysql中,只有innodb引擎才支持XA事务,所以这里显式的指定了数据库引擎为innodb。

-- 新建数据库db_user;
create database db_user;
-- 在db_user库中新建user表
create table db_user.user(id int AUTO_INCREMENT PRIMARY KEY,name varchar(50)) engine=innodb;
-- 新建数据库db_account;
create database db_account;
-- 在db_account库中新建account表
create table db_account.account(user_id int,money double) engine=innodb;

另外,在本案例中,db_user库和db_account库是位于同一个mysql实例中的。 

       

案例代码:

    在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。 

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;public class AtomikosExample {private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {// 连接池基本属性Properties p = new Properties();p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);p.setProperty("user", "root");p.setProperty("password", "your password");// 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSourceAtomikosDataSourceBean ds = new AtomikosDataSourceBean();//atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同ds.setUniqueResourceName(dbName);ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");ds.setXaProperties(p);return ds;}public static void main(String[] args) {AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");Connection conn1 = null;Connection conn2 = null;PreparedStatement ps1 = null;PreparedStatement ps2 = null;UserTransaction userTransaction = new UserTransactionImp();try {// 开启事务userTransaction.begin();// 执行db1上的sqlconn1 = ds1.getConnection();ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);ps1.setString(1, "tianshouzhi");ps1.executeUpdate();ResultSet generatedKeys = ps1.getGeneratedKeys();int userId = -1;while (generatedKeys.next()) {userId = generatedKeys.getInt(1);// 获得自动生成的userId}// 模拟异常 ,直接进入catch代码块,2个都不会提交
// int i=1/0;// 执行db2上的sqlconn2 = ds2.getConnection();ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");ps2.setInt(1, userId);ps2.setDouble(2, 10000000);ps2.executeUpdate();// 两阶段提交userTransaction.commit();} catch (Exception e) {try {e.printStackTrace();userTransaction.rollback();} catch (SystemException e1) {e1.printStackTrace();}} finally {try {ps1.close();ps2.close();conn1.close();conn2.close();ds1.close();ds2.close();} catch (Exception ignore) {}}}
}

2、TransactionEssentials与spring、mybatis整合

在pom中添加以下依赖 

org.springframeworkspring-jdbc4.3.7.RELEASE

org.springframeworkspring-context4.3.7.RELEASE

org.mybatismybatis3.4.1

org.mybatismybatis-spring1.3.1

新建User实体

package com.tianshouzhi.atomikos;
public class User {private int id;private String name;// setters and getters
}

新建Account实例

package com.tianshouzhi.atomikos;
public class Account {private int userId;private double money;// setters and getters
}

新建UserMapper接口,为了方便,这里使用了mybatis 注解方式,没有编写映射文件,作用是一样的

package com.tianshouzhi.atomikos.mappers.db_user;
import org.apache.ibatis.annotations.Insert;
import com.tianshouzhi.atomikos.User;
import org.apache.ibatis.annotations.Options;
public interface UserMapper {@Insert("INSERT INTO user(id,name) VALUES(#{id},#{name})")@Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")public void insert(User user);
}

新建AccountMapper接口

package com.tianshouzhi.atomikos.mappers.ds_account;
import com.tianshouzhi.atomikos.Account;
import org.apache.ibatis.annotations.Insert;
public interface AccountMapper {@Insert("INSERT INTO account(user_id,money) VALUES(#{userId},#{money})")public void insert(Account account);
}

新建使用JTA事务的bean,注意在使用jta事务的时候,依然可以使用spring的声明式事务管理

package com.tianshouzhi.atomikos;
import com.tianshouzhi.atomikos.mappers.db_user.UserMapper;
import com.tianshouzhi.atomikos.mappers.ds_account.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
public class JTAService {@Autowiredprivate UserMapper userMapper;//操作db_user库@Autowiredprivate AccountMapper accountMapper;//操作db_account库@Transactionalpublic void insert() {User user = new User();user.setName("wangxiaoxiao");userMapper.insert(user);// int i = 1 / 0;//模拟异常,spring回滚后,db_user库中user表中也不会插入记录Account account = new Account();account.setUserId(user.getId());account.setMoney(123456789);accountMapper.insert(account);}
}

编写配置文件spring-atomikos.xml


jdbc:mysql://localhost:3306/db_userrootshxx12151022jdbc:mysql://localhost:3306/db_accountrootshxx12151022

测试代码

package com.tianshouzhi.atomikos;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class AtomikosSpringMybatisExample {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");JTAService jtaService = context.getBean("jtaService", JTAService.class);jtaService.insert();}
}

    建议读者先直接按照上述代码运行,以确定代码执行后,db_user库的user表和db_account的account表中的确各插入了一条记录,以证明我们的代码的确是操作了2个库,属于分布式事务。

    然后将JTAService中的异常模拟的注释打开,会发现出现异常后,两个库中都没有新插入的数据库,说明我们使用的JTA事务管理器的确保证数据的一致性了。

Atomikos配置

    在掌握了Atomikos基本使用之后,我们对Atomikos的配置进行一下简单的介绍。Atomikos在启动后,默认会从以下几个位置读取配置文件,这里笔者直接贴出atomikos源码进行说明:

com.atomikos.icatch.provider.imp.AssemblerImp#initializeProperties方法中定义了配置加载顺序逻辑: 

@Overridepublic ConfigProperties initializeProperties() {//读取classpath下的默认配置transactions-defaults.propertiesProperties defaults = new Properties();loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);//读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值Properties transactionsProperties = new Properties(defaults);loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);//读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值Properties jtaProperties = new Properties(transactionsProperties);loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);//读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置Properties customProperties = new Properties(jtaProperties);loadPropertiesFromCustomFilePath(customProperties);//最终构造一个ConfigProperties对象,来表示实际要使用的配置Properties finalProperties = new Properties(customProperties);return new ConfigProperties(finalProperties);}

配置文件优先级:transactions-defaults.properties

其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-xxx.jar中.

5B4E3D0B-5CE5-4177-9CAD-C351ED3E897A.png

注意不同版本的默认配置可能不同。特别是3.x版本和4.x版本的差异比较明显。   

以下是4.0.6中 transactions-default.properties中配置内容,笔者对这些配置进行了归类,如下: 

===============================================================
============ 事务管理器(TM)配置参数 ==============
===============================================================
#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
com.atomikos.icatch.enable_logging=true
#JTA/XA资源是否应该自动注册
com.atomikos.icatch.automatic_resource_registration=true
#JTA事务的默认超时时间,默认为10000ms
com.atomikos.icatch.default_jta_timeout=10000
#事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
com.atomikos.icatch.max_timeout=300000
#指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
com.atomikos.icatch.threaded_2pc=false
#指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制
com.atomikos.icatch.max_actives=50
#是否支持subtransaction,默认为true
com.atomikos.icatch.allow_subtransactions=true
#指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
com.atomikos.icatch.serial_jta_transactions=true
#指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
com.atomikos.icatch.force_shutdown_on_vm_exit=false
#在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807===============================================================
========= 事务日志(Transaction logs)记录配置 =======
===============================================================
#事务日志目录,默认为./。
com.atomikos.icatch.log_base_dir=./
#事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
com.atomikos.icatch.log_base_name=tmlog
#指定两次checkpoint的时间间隔,默认为500
com.atomikos.icatch.checkpoint_interval=500===============================================================
========= 事务日志恢复(Recovery)配置 =============
===============================================================
#指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
#指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
#提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
com.atomikos.icatch.oltp_max_retries=5
#提交失败时,每次重试的时间间隔,默认10000ms
com.atomikos.icatch.oltp_retry_interval=10000===============================================================
========= 其他 =============================== ==
===============================================================
java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory
com.atomikos.icatch.client_demarcation=false
java.naming.provider.url=rmi://localhost:1099
com.atomikos.icatch.rmi_export_class=none
com.atomikos.icatch.trust_client_tm=false

当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。

关于不同版本配置的差异,请参考官方文档:https://www.atomikos.com/Documentation/JtaProperties

打印日志

4.x版本之后,优先尝试使用slf4j,如果没有则尝试使用log4j,如果二者都没有,则使用JUL。

参见:https://www.atomikos.com/Documentation/ConfiguringTheLogs

注意这里是说的是如何配置打印工作日志(work log),而前面说的是事务日志(transactions log),二者不是不同的。


推荐阅读
  • 本文详细介绍如何在Spring Boot项目中集成和使用JPA,涵盖JPA的基本概念、Spring Data JPA的功能以及具体的操作步骤,帮助开发者快速掌握这一强大的持久化技术。 ... [详细]
  • 本文介绍了如何在Java中使用`JCheckBoxMenuItem.setMnemonic()`方法,并提供了多个实际应用的代码示例。 ... [详细]
  • 本文档提供了详细的MySQL安装步骤,包括解压安装文件、选择安装类型、配置MySQL服务以及设置管理员密码等关键环节,帮助用户顺利完成MySQL的安装。 ... [详细]
  • 本文介绍了多种Eclipse插件,包括XML Schema Infoset Model (XSD)、Graphical Editing Framework (GEF)、Eclipse Modeling Framework (EMF)等,涵盖了从Web开发到图形界面编辑的多个方面。 ... [详细]
  • 使用Python模拟登录教务系统抓取成绩并分析存储
    本文详细介绍如何使用Python编程语言模拟登录学校教务系统,抓取学生的成绩信息,并进行数据分析和可视化处理,最终将数据存储到MySQL数据库中。 ... [详细]
  • 解决MySQL Administrator 登录失败问题
    本文提供了解决在使用MySQL Administrator时遇到的登录错误的方法,包括启动变量和服务部分禁用的问题。同时,文章还介绍了通过安全配置模式来解决问题的具体步骤。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 本文探讨了SQLAlchemy ORM框架中如何利用外键和关系(relationship)来建立表间联系,简化复杂的查询操作。通过示例代码详细解释了relationship的定义、使用方法及其与外键的相互作用。 ... [详细]
  • 深入探讨Web服务器与动态语言的交互机制:CGI、FastCGI与PHP-FPM
    本文详细解析了Web服务器(如Apache、Nginx等)与动态语言(如PHP)之间通过CGI、FastCGI及PHP-FPM进行交互的具体过程,旨在帮助开发者更好地理解这些技术背后的原理。 ... [详细]
  • Navicat Premium中MySQL用户管理:创建新用户及高级设置
    本文作为Navicat Premium用户管理系列的第二部分,主要介绍如何创建新的MySQL用户,包括设置基本账户信息、密码策略、账户限制以及SSL配置等。 ... [详细]
  • Struts2框架构建指南
    本文详细介绍了如何使用Struts2(版本2.3.16.3)构建Web应用,包括必要的依赖库添加、配置文件设置以及简单的示例代码。Struts2是Apache软件基金会下的一个开源框架,用于简化Java Web应用程序的开发。 ... [详细]
  • 对于初次购买阿里云服务器的新手用户来说,如何高效地利用服务器资源并成功部署网站是一个重要的课题。本文将详细指导您完成从购买服务器到网站上线的六个关键步骤。 ... [详细]
  • 本文探讨了一个在Spring项目中常见的问题——当pom.xml文件中引入了servlet依赖但未指定其作用域为provided时导致的应用启动失败。文章详细分析了错误原因,并提供了有效的解决方案。 ... [详细]
  • 本文详细介绍了在 Windows 7 上安装和配置 PHP 5.4 的 Memcached 分布式缓存系统的方法,旨在减少数据库的频繁访问,提高应用程序的响应速度。 ... [详细]
  • 本文探讨了在Linux系统中尝试访问远程MySQL数据库时遇到的权限拒绝错误,特别是当使用非root用户进行连接时出现的'Access denied for user'错误。 ... [详细]
author-avatar
33
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有