目录

配置seata

搭建工程 

运行结果 

 原理简介

 总结


seata是阿里推出的分布式事务解决方案,今天我来分享一下springboot多数据源怎么整合seata解决分布式事务的问题。它有3中模式:AT模式,TCC模式和Saga模式,本文主要介绍一下AT模式。

首先说一下本文使用的实验环境
springboot:2.1.6.RELEASE
orm框架:mybatis
数据库:mysql
数据库连接池:HikariCP
seata server:1.3.0

配置seata

首先下载seata server安装包,本文使用版本是1.3.0,下载地址如下:

https://github.com/seata/seata/releases

下载完成后,解压后在seata目录下建一个目录logs,在里面建一个文件seata_gc.log,如果不创建这个log文件,启动会报找不到文件的错误。
启动server,我本地使用windows环境的启动命令如下:

seata-server.bat -p 8091 -h 127.0.0.1 -m file

 关于启动命令的说明,我摘自官网(http://seata.io/en-us/docs/user/quickstart.html)

Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]  Options:    --host, -h      The host to bind.      Default: 0.0.0.0    --port, -p      The port to listen.      Default: 8091    --storeMode, -m      log store mode : file、db      Default: file    --helpe.g.sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

 下面是启动成功后的日志:

00:21:10,892 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[ERROR] - Active log file name: /root/logs/seata/txc.8091.error.log00:21:10,892 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[ERROR] - File property is set to [/root/logs/seata/txc.8091.error.log]00:21:10,892 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO00:21:10,892 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [ALL] to Logger[ROOT]00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [WARN] to Logger[ROOT]00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [ERROR] to Logger[ROOT]00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [CONSOLE] to Logger[ROOT]00:21:10,893 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.00:21:10,894 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@2038ae61 - Registering current configuration as safe fallback point2020-08-06 00:21:11.194  INFO --- [           main] io.seata.config.FileConfiguration        : The configuration file used is registry.conf2020-08-06 00:21:11.264  INFO --- [           main] io.seata.config.FileConfiguration        : The configuration file used is file.conf2020-08-06 00:21:11.959  INFO --- [           main] i.s.core.rpc.netty.NettyServerBootstrap  : Server started, listen port: 8091

搭建工程 

连接seata server的方式有多种,这儿我们采用文件的方式:

registry {  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa  type = "file"  file {    name = "file.conf"  }}

关于springboot多数据源的配置,如果你还不太熟悉,看一下我写的这篇文章《springboot研究三:springboot多数据源配置+mybatis+mysql》,本文不再介绍。
本文的示例来自seata官方示例,我做了修改,需要3个数据库:seata_pay、seata_order和seata_storage,每个数据库都有一张undo_log表记录回滚日志。sql语句如下:

# OrderDROP DATABASE IF EXISTS seata_order;CREATE DATABASE seata_order;CREATE TABLE seata_order.orders(    id               INT(11) NOT NULL AUTO_INCREMENT,    user_id          INT(11)        DEFAULT NULL,    product_id       INT(11)        DEFAULT NULL,    pay_amount       DECIMAL(10, 0) DEFAULT NULL,    status           VARCHAR(100)   DEFAULT NULL,    add_time         DATETIME       DEFAULT CURRENT_TIMESTAMP,    last_update_time DATETIME       DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;CREATE TABLE seata_order.undo_log(    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,    branch_id     BIGINT(20)   NOT NULL,    xid           VARCHAR(100) NOT NULL,    context       VARCHAR(128) NOT NULL,    rollback_info LONGBLOB     NOT NULL,    log_status    INT(11)      NOT NULL,    log_created   DATETIME     NOT NULL,    log_modified  DATETIME     NOT NULL,    PRIMARY KEY (id),    UNIQUE KEY ux_undo_log (xid, branch_id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;# StorageDROP DATABASE IF EXISTS seata_storage;CREATE DATABASE seata_storage;CREATE TABLE seata_storage.product(    id               INT(11) NOT NULL AUTO_INCREMENT,    price            DOUBLE   DEFAULT NULL,    stock            INT(11)  DEFAULT NULL,    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;INSERT INTO seata_storage.product (id, price, stock) VALUES (1, 5, 10);CREATE TABLE seata_storage.undo_log(    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,    branch_id     BIGINT(20)   NOT NULL,    xid           VARCHAR(100) NOT NULL,    context       VARCHAR(128) NOT NULL,    rollback_info LONGBLOB     NOT NULL,    log_status    INT(11)      NOT NULL,    log_created   DATETIME     NOT NULL,    log_modified  DATETIME     NOT NULL,    PRIMARY KEY (id),    UNIQUE KEY ux_undo_log (xid, branch_id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;# PayDROP DATABASE IF EXISTS seata_pay;CREATE DATABASE seata_pay;CREATE TABLE seata_pay.account(    id               INT(11) NOT NULL AUTO_INCREMENT,    balance          DOUBLE   DEFAULT NULL,    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;CREATE TABLE seata_pay.undo_log(    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,    branch_id     BIGINT(20)   NOT NULL,    xid           VARCHAR(100) NOT NULL,    context       VARCHAR(128) NOT NULL,    rollback_info LONGBLOB     NOT NULL,    log_status    INT(11)      NOT NULL,    log_created   DATETIME     NOT NULL,    log_modified  DATETIME     NOT NULL,    PRIMARY KEY (id),    UNIQUE KEY ux_undo_log (xid, branch_id)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8;INSERT INTO seata_pay.account (id, balance) VALUES (1, 1);SELECT auto_incrementFROM information_schema.TABLESWHERE TABLE_SCHEMA = 'seata_order'  AND TABLE_NAME = 'undo_log'

整个springboot的目录如下

 这里用的是动态数据源,跟我之前文章讲的mybatis多数据源配置不同,这里我主要讲一下这一块

首先看一下mybatis config文件mybatis.xml,所有数据库的xml映射文件都写入一个xml里面,代码如下:

<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE configuration        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"        "http://mybatis.org/dtd/mybatis-3-config.dtd"><configuration><settings><setting name="cacheEnabled" value="false" /><setting name="logImpl" value="SLF4J" /><setting name="jdbcTypeForNull" value="NULL"/></settings><typeAliases><typeAlias alias="User" type="boot.domain.order.Order" /><typeAlias alias="Employee" type="boot.domain.order.OrderStatus" /></typeAliases><mappers><!--order库--><mapper resource="mybatis/ordermapper/OrderMapper.xml" /><!--account库--><mapper resource="mybatis/paymapper/AccountMapper.xml" /><!--storage库--><mapper resource="mybatis/storagemapper/ProductMapper.xml" /></mappers></configuration>

再看一下application.properties中数据源的配置,这儿我们创建了3个数据库,所以需要配置3个数据源:

spring.application.name=springboot-seata######seata_pay#############datasource.pay.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_pay?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8datasource.pay.username=rootdatasource.pay.password=123456datasource.pay.driver-class-name=com.mysql.cj.jdbc.Driver######seata_storage#############datasource.storage.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_storage?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8datasource.storage.username=rootdatasource.storage.password=123456datasource.storage.driver-class-name=com.mysql.cj.jdbc.Driver######seata_order#############datasource.order.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_order?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8datasource.order.username=rootdatasource.order.password=123456datasource.order.driver-class-name=com.mysql.cj.jdbc.Driverspring.cloud.alibaba.seata.tx-service-group=springboot-seata

mybatis动态数据源配置类如下:

@Configuration@MapperScan(basePackages={"boot.mapper"}, sqlSessionFactoryRef = "sqlSessionFactoryBean")public class DataSourceProxyConfig {    @Bean("originOrder")    @ConfigurationProperties(prefix = "datasource.order")    public DataSource dataSourceMaster() {        return DataSourceBuilder.create().build();    }    @Bean("originStorage")    @ConfigurationProperties(prefix = "datasource.storage")    public DataSource dataSourceStorage() {        return DataSourceBuilder.create().build();    }    @Bean("originPay")    @ConfigurationProperties(prefix = "datasource.pay")    public DataSource dataSourcePay() {        return DataSourceBuilder.create().build();    }    @Bean(name = "order")    public DataSourceProxy masterDataSourceProxy(@Qualifier("originOrder") DataSource dataSource) {        return new DataSourceProxy(dataSource);    }    @Bean(name = "storage")    public DataSourceProxy storageDataSourceProxy(@Qualifier("originStorage") DataSource dataSource) {        return new DataSourceProxy(dataSource);    }    @Bean(name = "pay")    public DataSourceProxy payDataSourceProxy(@Qualifier("originPay") DataSource dataSource) {        return new DataSourceProxy(dataSource);    }    @Bean("dynamicDataSource")    public DataSource dynamicDataSource(@Qualifier("order") DataSource dataSourceOrder,                                        @Qualifier("storage") DataSource dataSourceStorage,                                        @Qualifier("pay") DataSource dataSourcePay) {//这儿是动态数据源配置的关键,3个数据源放在了一个map里面        DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource();        Map<Object, Object> dataSourceMap = new HashMap<>(3);        dataSourceMap.put(DataSourceKey.ORDER.name(), dataSourceOrder);        dataSourceMap.put(DataSourceKey.STORAGE.name(), dataSourceStorage);        dataSourceMap.put(DataSourceKey.PAY.name(), dataSourcePay);        dynamicRoutingDataSource.setDefaultTargetDataSource(dataSourceOrder);        dynamicRoutingDataSource.setTargetDataSources(dataSourceMap);        DynamicDataSourceContextHolder.getDataSourceKeys().addAll(dataSourceMap.keySet());        return dynamicRoutingDataSource;    }    @Bean    @ConfigurationProperties(prefix = "mybatis")    public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("dynamicDataSource") DataSource dataSource) {        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();        sqlSessionFactoryBean.setDataSource(dataSource);        sqlSessionFactoryBean.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:mybatis/mybatis.xml"));//这儿只能放一个config文件,所以3个数据源的xml文件放到了一个文件里面        return sqlSessionFactoryBean;    }}

在service调用dao时,需要切换数据源,代码如下:

DynamicDataSourceContextHolder.setDataSourceKey(DataSourceKey.ORDER);//切换到seata_order数据库

这里面的切换本质上是将数据库名称放到一个ThreadLocal上

public class DynamicDataSourceContextHolder {    private static final ThreadLocal<String> CONTEXT_HOLDER = ThreadLocal.withInitial(DataSourceKey.ORDER::name);    private static List<Object> dataSourceKeys = new ArrayList<>();    public static void setDataSourceKey(DataSourceKey key) {        CONTEXT_HOLDER.set(key.name());    }    public static String getDataSourceKey() {        return CONTEXT_HOLDER.get();    }    public static void clearDataSourceKey() {        CONTEXT_HOLDER.remove();    }    public static List<Object> getDataSourceKeys() {        return dataSourceKeys;    }}

 而ThreadLocal上面的数据库为动态数据源使用

public class DynamicRoutingDataSource extends AbstractRoutingDataSource {    @Override    protected Object determineCurrentLookupKey() {        log.info("当前数据源 [{}]", DynamicDataSourceContextHolder.getDataSourceKey());        return DynamicDataSourceContextHolder.getDataSourceKey();    }}

这样seata在获取连接的时候,就可以取到当前的数据库连接,因为每个库里面都有一个undo_log表需要写回滚日志,所以必须能够保证seata能够动态获取当前的数据库。

运行结果 

执行Application的main函数启动工程。
在上面的建表语句中,我们建了订单seata_order、支付seata_pay、库存seata_storage3个数据库,支付数据库中的account表插入了一条记录,余额是1,库存数据库的product表插入了1条记录,商品数量(stock)是10。
我们用postman模拟发送一个请求:

url:http://localhost:8083/order/placeOrder,content:{ "userId":1, "productId":1, "price":1}

 执行成功,这时orders表插入了1条记录,account表余额减为0,product表商品数量减为9。如下3个图,

 

 

 

 

 

这是我们再发一次上面的http请求,会失败,因为账户余额不足。这次我们采用debug方式,看一下undo_log的数据。程序执行到下图中的断点时,会产生undo_log,如下图:

我们看下undo_log中字段rollback_info数据:

{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.59.132:8091:34937248742391808","branchId":34937257391046656,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"INSERT","tableName":"orders","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords","tableName":"orders","rows":["java.util.ArrayList",[]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"orders","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":4,"value":2},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"user_id","keyType":"NULL","type":4,"value":1},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"product_id","keyType":"NULL","type":4,"value":1},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"pay_amount","keyType":"NULL","type":3,"value":["java.math.BigDecimal",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":12,"value":"INIT"},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"add_time","keyType":"NULL","type":93,"value":["java.sql.Timestamp",[1596793692000,0]]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"last_update_time","keyType":"NULL","type":93,"value":["java.sql.Timestamp",[1596793692000,0]]}]]}]]}}]]}

可以看到undo_log记录了要回滚的表中的每个字段的值,比如id=2,回滚时使用。
这时我们看seata server的日志,如下:

2020-08-06 21:49:08.740  INFO --- [Thread_1_12_500] io.seata.server.coordinator.DefaultCore  : Rollback global transaction successfully, xid = 192.168.59.132:8091:34937248742391808.

 原理简介

其实分布式事务的原理还是使用了单数据库的事务原理,我们可以把分布式事务中每个数据库看做是单数据库的表。首先每个事务有一个全局的事务id,叫做xid,上面的第二个例子中值是192.168.59.132:8091:34937248742391808。
有了这个xid后,我们就可以记录undo_log了,undo_log中记录了这个xid的,每次提交事务前都要先写undo_log,后提交事务,这时你一定恍然大悟,这不就是mysql中的wal机制吗?
而rollback_info字段记录了要回滚的表的记录中的每个字段和对应值,这样就方便的回滚了。这时你肯定又恍然大悟,与其说是回滚,难道这不就是交易补偿啊?
理解了这个,在理解seata官方的解释就容易多了,如下图:

seata中有3个角色,TC其实就是seata server,RM是单个数据库的事务管理器,TM是定义开启和提交回滚全局事务的组件。官方定义如下:

Transaction Coordinator(TC): Maintain status of global and branch transactions, drive the global commit or rollback.Transaction Manager(TM): Define the scope of global transaction: begin a global transaction, commit or rollback a global transaction.Resource Manager(RM): Manage resources that branch transactions working on, talk to TC for registering branch transactions and reporting status of branch transactions, and drive the branch transaction commit or rollback.

 总结

本文主要介绍了springboot多数据源整合seata的使用,也简单介绍了一些原理。seata对分布式事务的管理思想其实还是单个数据库事务的思想。后面有时间再详细介绍seata的原理。 

 

源代码地址:

https://github.com/jinjunzhu/springboot-seata.git

参考:

http://seata.io/en-us/docs/user/quickstart.htmlhttps://github.com/seata/seatahttps://github.com/seata/seata-samples

 


©著作权归作者所有:来自51CTO博客作者朱晋君的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 通过注解的 springboot+mybatis 多数据源配置
  2. 10. SpringCloud实战项目-微服务划分图
  3. 11. SpringCloud实战项目-初始化数据库和表
  4. Windows如何安装mysql数据库!
  5. 【OCP最新题库解析(052)--题16】Your database instance is in N
  6. 【DG】怎么使用Data Pump备份物理备库
  7. DNS 引起经典RAC故障
  8. DUAL系列
  9. SQL数据库中了.chch后缀勒索病毒如何解决?

随机推荐

  1. php性能优化的方法介绍
  2. php实现顺序线性表
  3. Kubernetes 如果是个水族馆
  4. 自学系列 | 就谈自我管理!
  5. 智慧城市信息安全探讨
  6. 1-22
  7. 动画:面试必刷之二叉树的子结构
  8. 详解 centos7设置nfs文件共享 实操记
  9. 2020 总结 | VoltDB的亮点,你了解多少?
  10. 自学系列 | 俺今天就谈兴趣!