分布式事务实战
实战代码示例: https://gitee.com/ixinglan/mysql-demo.git
sharding-jdbc-demo: 详见README.md 介绍
ShardingSphere整合了XA、Saga和Seata模式后,为分布式事务控制提供了极大的便利,我们可以在应用程序编程时,采用以下统一模式进行使用。
-
引入依赖
//XA模式 <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-transaction-xa-core</artifactId> <version>${shardingsphere.version}</version> </dependency> //Saga模式 <dependency> <groupId>io.shardingsphere</groupId> <artifactId>sharding-transaction-base-saga</artifactId> <version>${shardingsphere-spi-impl.version}</version> </dependency> //Seata模式 <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-transaction-base-seata-at</artifactId> <version>${sharding-sphere.version}</version> </dependency>
-
JAVA编码方式设置事务类型
TransactionTypeHolder.set(TransactionType.XA); TransactionTypeHolder.set(TransactionType.BASE);
-
参数设置
ShardingSphere默认的XA事务管理器为Atomikos,通过在项目的classpath中添加
jta.properties
来定制化Atomikos配置项。具体的配置规则如下:#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证 com.atomikos.icatch.enable_logging=true #JTA/XA资源是否应该自动注册 com.atomikos.icatch.automatic_resource_registration=true #JTA事务的默认超时时间,默认为10000ms com.atomikos.icatch.default_jta_timeout=10000 #事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示 不设置超时时间 com.atomikos.icatch.max_timeout=300000 #指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本 默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。 com.atomikos.icatch.threaded_2pc=false #指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制 com.atomikos.icatch.max_actives=50 #是否支持subtransaction,默认为true com.atomikos.icatch.allow_subtransactions=true #指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为 false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN) com.atomikos.icatch.serial_jta_transactions=true #指定JVM关闭时是否强制(force)关闭事务管理器,默认为false com.atomikos.icatch.force_shutdown_on_vm_exit=false #在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807 ========= 日志记录配置======= #事务日志目录,默认为./。 com.atomikos.icatch.log_base_dir=./ #事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件 以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。 com.atomikos.icatch.log_base_name=tmlog #指定两次checkpoint的时间间隔,默认为500 com.atomikos.icatch.checkpoint_interval=500 =========日志恢复配置============= #指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000 #指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout 相同 com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout } #提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5 com.atomikos.icatch.oltp_max_retries=5 #提交失败时,每次重试的时间间隔,默认10000ms com.atomikos.icatch.oltp_retry_interval=10000
Saga可以通过在项目的classpath中添加
saga.properties
来定制化Saga事务的配置项。配置项的属性及说明如下:属性 默认值 说明 saga.actuator.executor.size 5 使用的线程池大小 saga.actuator.transaction.max.retries 5 失败SQL的最大重试次数 saga.actuator.compensation.max.retries 5 失败SQL的最大尝试补偿次数 saga.actuator.transaction.retry.delay.milliseconds 5000 失败SQL的重试间隔,单位毫秒 saga.actuator.compensation.retry.delay.milliseconds 3000 失败SQL的补偿间隔,单位毫秒 saga.persistence.enabled False 是否对日志进行持久化 saga.persistence.ds.url 事务日志数据库JDBC 连接 saga.persistence.ds.username 事务日志数据库用户名 saga.persistence.ds.password 事务日志数据库密码 saga.persistence.ds.max.pool.size 50 事务日志连接池最大连接数 saga.persistence.ds.min.pool.size 1 事务日志连接池最小连接数 saga.persistence.ds.max.life.time.milliseconds 0(无限制) 事务日志连接池最大存活时间,单位毫秒 saga.persistence.ds.idle.timeout.milliseconds 60*1000 事务日志连接池空闲回收时间,单位毫秒 saga.persistence.ds.connection.timeout.milliseconds 30*1000 事务日志连接池超时时间,单位毫秒
这里演示saga的使用
- 添加saga.properties
- 主配置文件修改 active: sharding-database
- 启动类添加 @EnableTransactionManagement
- 测试方法 TestShardingTransaction: test1
- 添加 @Transactional
- 设置
TransactionTypeHolder.set(TransactionType.XA);
或者用 @ShardingTransactionType(TransactionType.XA)