分布式事务saga实现的理论基础Hector&Kenneth在1987年发表的论文Sagas,它的核心思想是当整个事务的一个节点失败后,依赖于状态对当前事务从前到后进行重试,或者从后往前进行补偿。

 saga模式的主要应用场景是业务流程比较长,有一些服务不能提供TCC模式的三个接口,或者不能实现AT模式的依赖undolog实现自动补偿。

阿里的seata中间件是通过状态机来实现的,它使用状态图定义服务调用流程并生成json状态语言定义文件,状态图的节点可以是一个服务,也可以是补偿节点。这个生成的json由状态机引擎来驱动执行,出现异常是状态机引擎对调用成功的服务从后往前补偿,而补偿的逻辑需要由服务自己来实现。

 本文我们还是用之前TCC模式中的例子,我们在电商网站购买一件商品,后台首先会从订单服务下单,然后订单服务会调用账户服务扣减商品金额,如果成功,再调用库存服务扣减库存。如果其中某一步失败,则从后往前依次补偿,这个补偿事件由状态机触发。

配置状态机

首先我们需要创建3张表,sql语句如下,注意,本文使用的mysql:

create table seata_state_machine_def(    id               varchar(32)  not null comment 'id',    name             varchar(128) not null comment 'name',    tenant_id        varchar(32)  not null comment 'tenant id',    app_name         varchar(32)  not null comment 'application name',    type             varchar(20) comment 'state language type',    comment_         varchar(255) comment 'comment',    ver              varchar(16)  not null comment 'version',    gmt_create       timestamp(3)    not null comment 'create time',    status           varchar(2)   not null comment 'status(AC:active|IN:inactive)',    content          longtext comment 'content',    recover_strategy varchar(16) comment 'transaction recover strategy(compensate|retry)',    primary key (id));CREATE TABLE seata_state_machine_inst(    id                  VARCHAR(128) NOT NULL COMMENT 'id',    machine_id          VARCHAR(32) NOT NULL COMMENT 'state machine definition id',    tenant_id           VARCHAR(32) NOT NULL COMMENT 'tenant id',    parent_id           VARCHAR(128) COMMENT 'parent id',    gmt_started         TIMESTAMP(3)   NOT NULL COMMENT 'start time',    business_key        VARCHAR(48) COMMENT 'business key',    start_params        LONGTEXT COMMENT 'start parameters',    gmt_end             TIMESTAMP(3) COMMENT 'end time',    excep               BLOB COMMENT 'exception',    end_params          LONGTEXT COMMENT 'end parameters',    STATUS              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',    compensation_status VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',    is_running          TINYINT(1) COMMENT 'is running(0 no|1 yes)',    gmt_updated         TIMESTAMP(3)   NOT NULL,    PRIMARY KEY (id),    UNIQUE KEY unikey_buz_tenant (business_key, tenant_id));CREATE TABLE seata_state_inst(    id                       VARCHAR(48)  NOT NULL COMMENT 'id',    machine_inst_id          VARCHAR(128)  NOT NULL COMMENT 'state machine instance id',    NAME                     VARCHAR(128) NOT NULL COMMENT 'state name',    TYPE                     VARCHAR(20) COMMENT 'state type',    service_name             VARCHAR(128) COMMENT 'service name',    service_method           VARCHAR(128) COMMENT 'method name',    service_type             VARCHAR(16) COMMENT 'service type',    business_key             VARCHAR(48) COMMENT 'business key',    state_id_compensated_for VARCHAR(50) COMMENT 'state compensated for',    state_id_retried_for     VARCHAR(50) COMMENT 'state retried for',    gmt_started              TIMESTAMP(3)    NOT NULL COMMENT 'start time',    is_for_update            TINYINT(1) COMMENT 'is service for update',    input_params             LONGTEXT COMMENT 'input parameters',    output_params            LONGTEXT COMMENT 'output parameters',    STATUS                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',    excep                    BLOB COMMENT 'exception',    gmt_end                  TIMESTAMP(3) COMMENT 'end time',    PRIMARY KEY (id, machine_inst_id));

本文电商购物的流程状态,我们用下图表示:

 注意:seata提供了下面地址可以绘制这个图,同时生成对应的json代码。本文的json代码是参考官方示例手工改写的。

http://seata.io/saga_designer/index.html#/

状态机需要依赖一个json,这个json定义了上面流程图中的节点,代码如下:

{    "Name": "buyGoodsOnline",    "Comment": "buy a goods on line, add order, deduct account, deduct storage ",    "StartState": "SaveOrder",    "Version": "0.0.1",    "States": {        "SaveOrder": {            "Type": "ServiceTask",            "ServiceName": "orderSave",            "ServiceMethod": "saveOrder",            "CompensateState": "DeleteOrder",            "Next": "ChoiceAccountState",            "Input": [                "$.[businessKey]",                "$.[order]"            ],            "Output": {                "SaveOrderResult": "$.#root"            },            "Status": {                "#root == true": "SU",                "#root == false": "FA",                "$Exception{java.lang.Throwable}": "UN"            }        },        "ChoiceAccountState":{            "Type": "Choice",            "Choices":[                {                    "Expression":"[SaveOrderResult] == true",                    "Next":"ReduceAccount"                }            ],            "Default":"Fail"        },        "ReduceAccount": {            "Type": "ServiceTask",            "ServiceName": "accountService",            "ServiceMethod": "decrease",            "CompensateState": "CompensateReduceAccount",            "Next": "ChoiceStorageState",            "Input": [                "$.[businessKey]",                "$.[userId]",                "$.[money]",                {                    "throwException" : "$.[mockReduceAccountFail]"                }            ],            "Output": {                "ReduceAccountResult": "$.#root"            },            "Status": {                "#root == true": "SU",                "#root == false": "FA",                "$Exception{java.lang.Throwable}": "UN"            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"                }            ]        },        "ChoiceStorageState":{            "Type": "Choice",            "Choices":[                {                    "Expression":"[ReduceAccountResult] == true",                    "Next":"ReduceStorage"                }            ],            "Default":"Fail"        },        "ReduceStorage": {            "Type": "ServiceTask",            "ServiceName": "storageService",            "ServiceMethod": "decrease",            "CompensateState": "CompensateReduceStorage",            "Input": [                "$.[businessKey]",                "$.[productId]",                "$.[count]",                {                    "throwException" : "$.[mockReduceStorageFail]"                }            ],            "Output": {                "ReduceStorageResult": "$.#root"            },            "Status": {                "#root == true": "SU",                "#root == false": "FA",                "$Exception{java.lang.Throwable}": "UN"            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"                }            ],            "Next": "Succeed"        },        "DeleteOrder": {            "Type": "ServiceTask",            "ServiceName": "orderSave",            "ServiceMethod": "deleteOrder",            "Input": [                "$.[businessKey]",                "$.[order]"            ]        },        "CompensateReduceAccount": {            "Type": "ServiceTask",            "ServiceName": "accountService",            "ServiceMethod": "compensateDecrease",            "Input": [                "$.[businessKey]",                "$.[userId]",                "$.[money]"            ]        },        "CompensateReduceStorage": {            "Type": "ServiceTask",            "ServiceName": "storageService",            "ServiceMethod": "compensateDecrease",            "Input": [                "$.[businessKey]",                "$.[productId]",                "$.[count]"            ]        },        "CompensationTrigger": {            "Type": "CompensationTrigger",            "Next": "Fail"        },        "Succeed": {            "Type":"Succeed"        },        "Fail": {            "Type":"Fail",            "ErrorCode": "PURCHASE_FAILED",            "Message": "purchase failed"        }    }}

上面的json中,我们定义了6个ServiceTask,分别对应订单服务保存订单、账户服务扣减账户和库存服务扣减库存以及对应的补偿机制。
我们定义了CompensationTrigger,并且在账户服务和库存服务抛出异常后,会调用CompensationTrigger来触发补偿事件。
对于每一个节点,我们需要定义type,同时对于ServiceTask类型的节点,我们需要定义触发方法,回滚事件对应的ServiceTask,下一个流程节点,输入/输出参数、异常等。

订单服务

订单服务是本文中的集成服务,它会调用账户服务和库存服务来实现业务。

首先,前端发起下单请求后,订单服务会接收这个服务,然后启动状态机,这个代码在OrderServiceImpl,代码如下:

public boolean create(Order order) {    LOGGER.info("------->交易开始");    StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");    Map<String, Object> startParams = new HashMap<>(3);    String businessKey = String.valueOf(System.currentTimeMillis());    startParams.put("businessKey", businessKey);    startParams.put("order", order);    startParams.put("mockReduceAccountFail", "true");    startParams.put("userId", order.getUserId());    startParams.put("money", order.getPayAmount());    startParams.put("productId", order.getProductId());    startParams.put("count", order.getCount());    //这里状态机是同步的,seata也支持异步,可以参考官方示例    StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());    System.out.println("saga transaction commit succeed. XID: " + inst.getId());    inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstanceByBusinessKey(businessKey, null);    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());    return true;}

保存账单OrderSaveImpl对应json里面的orderSave,里面的回滚方法就是deleteOrder,代码如下:

public class OrderSaveImpl implements OrderApi{    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private OrderDao orderDao;    @Override    public boolean saveOrder(String businessKey, Order order) {        logger.info("保存订单, businessKey:{}, order: {}", businessKey, order);        orderDao.create(order);        return true;    }    /**     * 回滚事务,删除订单     * @param order order     * @return     */    @Override    public boolean deleteOrder(String businessKey,Order order){        logger.info("删除订单, businessKey:{}, order: {}", businessKey, order);        orderDao.delete(order);        return true;    }}

定义调用账户服务,这里使用feign调用,补偿方法是compensateDecrease,对应json文件中的accountService,代码如下:

public class AccountServiceImpl implements AccountService{    @Resource    private AccountApi accountApi;    @Override    public boolean decrease(String businessKey, Long userId, BigDecimal money) {        return accountApi.decrease(businessKey, userId, money);    }    @Override    public boolean compensateDecrease(String businessKey, Long userId, BigDecimal money) {        return accountApi.compensateDecrease(businessKey, userId, money);    }}

定义调用库存服务,这里使用feign调用,补偿方法是compensateDecrease,对应json文件中的storageService,代码如下:

public class StorageServiceImpl implements StorageService{    @Resource    private StorageApi storageApi;    @Override    public boolean decrease(String businessKey, Long productId, Integer count) {        return storageApi.decrease(businessKey, productId, count);    }    @Override    public boolean compensateDecrease(String businessKey, Long productId, Integer count) {        return storageApi.compensateDecrease(businessKey, productId, count);    }}

库存服务 

看了订单服务代码,再看库存服务就非常简单了,为订单服务提供http接口,包括2个方法,扣减库存和补偿扣减库存,controller调用的service代码如下:

@Service("storageService")public class StorageServiceImpl implements StorageService {    private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);    @Resource    private StorageDao storageDao;    @Override    public boolean decrease(Long productId, Integer count) {        LOGGER.info("扣减库存, commit, productId:{}, count:{}", productId, count);        storageDao.decrease(productId, count);        //throw new RuntimeException();    }    @Override    public boolean compensateDecrease(Long productId, Integer count) {        LOGGER.info("补偿扣减库存, compensate, productId:{}, count:{}", productId, count);        storageDao.compensateDecrease(productId, count);        return true;    }}

 账户服务的代码跟这个类似,就不再贴代码了。

测试

本文的实验中,订单、账户、库存服务都有自己的数据库,这里不再贴sql了,需要了解的可以看我之前的文章《springcloud+eureka整合seata-AT模式》,或者下载源代码,文末有源码地址。

 开始实验之前,订单表没有数据,账户和库存数据如下:

 account表:

 

 storage表:

我们进行一次成功的实验,向下面的url发送购买商品请求:

http://localhost:8180/order/create{  "userId":1,  "productId":1,  "count":1,  "money":1,  "payAmount":50}

成功之后发现订单表有了数据,账户表和库存表数据如下:
order表:

 account表

 storage表

我们把库存服务的decrease方法改成如下:

public boolean decrease(Long productId, Integer count) {    LOGGER.info("扣减库存, commit, productId:{}, count:{}", productId, count);storageDao.decrease(productId, count);    throw new RuntimeException();}

这时发送购买商品请求后会抛出异常,然后3个服务的事务依次做交易补偿,所有表数据没有变。

 总结

seata中的saga模式适用于长流程或者长事务的场景。而saga模式复杂的地方在于引入了状态机,需要定义状态机的流程,把定义好的流程用json文件引入工程中。
同时saga模式需要开发者自己定义回滚事件,如果回滚失败,对整个事务的控制就非常复杂了。

本文源码地址:

https://github.com/jinjunzhu/springcloud-eureka-seata-saga.git



©著作权归作者所有:来自51CTO博客作者朱晋君的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 网站需要更换服务器应该怎么做?
  2. 12. SpringCloud实战项目-搭建管理后台
  3. 新手购买阿里云服务器图文教程及注意事项
  4. 10. SpringCloud实战项目-微服务划分图
  5. 01. SpringCloud实战项目-五分钟搞懂分布式基础概念
  6. 徒手开发一个迷你版本的tomcat服务器
  7. 个人搭建网站要如何选择服务器?
  8. 09. SpringCloud实战项目-初始化项目和添加微服务
  9. 15.SpringCloud实战项目-生成所有微服务的CRUD代码

随机推荐

  1. 利用jQuery实现CheckBox全选/全不选/反选
  2. jquery中的globalEval()源码分析
  3. 从jQuery每个循环中删除item [i]
  4. Jquery 特效 图片轮转 菜单
  5. jQuery学习21---简单动画效果,show,hide,
  6. 将Object转换为字符串并返回[复制]
  7. Jquery 只执行一次的代码
  8. Jquery Validate 相关参数及常用的自定义
  9. BootStrap入门教程 (四)
  10. 分享27款非常棒的 jQuery 表单插件