使用Java轻松完成一个分布式事务TCC,自动处理空值补偿、挂起、幂等《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文发表了。TCC组成TCC分为3个阶段Try阶段:尝试执行,完成所有业务检查(一致性),预留必要的业务资源(准隔离)Confirm阶段:如果Try的所有分支都成功,则进入Confirm阶段。Confirm实际执行业务,不做任何业务检查,只使用Try阶段预留的业务资源。Cancel阶段:如果所有分支中有一个Try失败,则进入Cancel阶段。Cancel释放Try阶段预留的业务资源。在TCC分布式事务中,有3个角色,与经典的XA分布式事务相同:AP/应用程序,发起全局事务,定义全局事务中包含哪些事务分支RM/资源管理器,负责分支交易各种资源的管理TM/transactionmanager负责协调全局交易的正确执行,包括Confirm和Cancel的执行,以及处理网络异常。如果我们要进行一个类似银行跨行转账的业务,分别在不同的微服务中转出(TransOut)和转入(TransIn),一个成功完成的TCC交易的典型时序图如下:TCC网络异常期间TCC整个全局事务过程,可能会出现各种网络异常,典型的是空回滚、幂等、挂起。以下示例使用dtm的子事务屏障SDK来优雅地处理这些异常。在我所知道的每一个开源项目中,目前(2021-12-01)开发者都需要手动处理这些异常,而我们项目的系统自动处理,这是第一次,大大降低了分布式事务的开发难度。这里有一篇文章,分布式事务的这些常见用法都有陷阱。下面我们就来看看正确的姿势,详细解释一下这种网络异常,目前的技术现状和问题,以及我们的解决方案。TCC实践对于之前的跨行转账操作,最简单的做法是在Try阶段调整余额,在Cancel阶段反转余额,在Confirm阶段什么都不做。这样造成的问题是,如果A扣钱成功,但是转账给B失败,就会回滚,A的余额会调整到初始值。在这个过程中,如果A发现自己的余额被扣了,而收款人B却迟迟没有收到余额,就会给A带来麻烦。更好的做法是在Try阶段冻结A转账的金额,确认实际扣款,Cancel解冻资金,让用户在任何阶段看到的数据都清晰明了。接下来,让我们开发一个TCC事务。我们的例子使用的是Java语言,使用的分布式事务框架是https://github.com/yedf/dtm,非常优雅的支持分布式事务。下面详细解释一下TCC的组成。我们首先创建一个用户余额表。建表语句如下:createtableifnotexistsdtm_busi.user_account(idint(11)PRIMARYKEYAUTO_INCREMENT,user_idint(11)UNIQUE,balanceDECIMAL(10,2)notnulldefault'0',trading_balanceDECIMAL(10、2)notnulldefault'0',create_timedatetimeDEFAULTnow(),update_timedatetimeDEFAULTnow(),key(create_time),key(update_time));表中,trading_balance记录了正在交易的金额。先写核心代码,冻结/解冻资金操作,检查约束balance+trading_balance>=0,如果约束不成立则执行失败publicvoidadjustTrading(Connectionconnection,TransReqtransReq)throwsException{Stringsql="更新dtm_busi.user_account设置trading_balance=trading_balance+?"+"其中user_id=?andtrading_balance+?+balance>=0";PreparedStatementpreparedStatement=null;尝试{preparedStatement=connection.prepareStatement(sql);preparedStatement.setInt(1,transReq.getAmount());preparedStatement.setInt(2,transReq.getUserId());preparedStatement.setInt(3,transReq.getAmount());if(preparedStatement.executeUpdate()>0){System.out.println("交易金额更新成功");}else{thrownewFailureException("交易失败");}}finally{if(null!=preparedStatement){preparedStatement.close();}}}然后调整余额publicvoidadjustBalance(Connectionconnection,TransReqtransReq)抛出SQLException{PreparedStatementpreparedStatement=null;try{Stringsql="updatedtm_busi.user_accountsettrading_balance=trading_balance-?,balance=balance+?whereuser_id=?";preparedStatement=connection.prepareStatement(sql);preparedStatement.setInt(1,transReq.getAmount());preparedStatement.setInt(2,transReq.getAmount());preparedStatement.setInt(3,transReq.getUserId());if(preparedStatement.executeUpdate()>0){System.out.println("余额更新成功");}}finally{if(null!=preparedStatement){preparedStatement.close();下面我们来编写实体的尝试/确认/取消的处理函数@RequestMapping("barrierTransOutTry")publicObjectTransOutTry(HttpServletRequestrequest)throwsException{BranchBarrierbranchBarrier=newBranchBarrier(request.getParameterMap());logger.info("barrierTransOutTrybranchBarrier:{}",branchBarrier);交易请求交易sReq=提取(请求);连接connection=dataSourceUtil.getConnection();branchBarrier.call(connection,(barrier)->{System.out.println("用户:+"+transReq.getUserId()+",转出"+Math.abs(transReq.getAmount())+"元准备");this.adjustTrading(connection,transReq);});connection.close();返回TransResponse.buildTransResponse(Constant.SUCCESS_RESULT);}@RequestMapping("barrierTransOutConfirm")publicObjectTransOutConfirm(HttpServletRequestrequest)抛出异常logger.info("barrierTransOutConfirmbranchBarrier:{}",branchBarrier);连接connection=dataSourceUtil.getConnection();TransReqtransReq=提取(请求);branchBarrier.call(connection,(barrier)->{System.out.println("用户:+"+transReq.getUserId()+",转出"+Math.abs(transReq.getAmount())+"元提交");调整平衡(连接,tr回答请求);});connection.close();返回TransResponse.buildTransResponse(Constant.SUCCESS_RESULT);}@RequestMapping("barrierTransOutCancel")publicObjectTransOutCancel(HttpServletRequestrequest)throwsException{BranchBarriterMapbranchBarrier=newBranchBarrierMap));logger.info("barrierTransOutCancelbranchBarrier:{}",branchBarrier);TransReqtransReq=提取(请求);连接connection=dataSourceUtil.getConnection();branchBarrier.call(connection,(barrier)->{System.out.println("User:+"+transReq.getUserId()+",转出"+Math.abs(transReq.getAmount())+"元回滚");this.adjustTrading(connection,transReq);});connection.close();returnTransResponse.buildTransResponse(Constant.SUCCESS_RESULT);}//TransIn相关函数与TransOut类似,这里省略各个子交易的处理函数。在上面的代码中,下面一行是与子事务屏障相关的代码。只要你这样调用你的业务逻辑,子事务屏障保证你的业务逻辑在出现重复请求、挂起、空赔时不会被调用,保证正常业务的正确运行BranchBarrierbranchBarrier=newBranchBarrier(request.getParameterMap());branchBarrier.call(connection,(barrier)->{...});然后启动TCC事务并进行分支调用@RequestMapping("tccBarrier")publicStringtccBarrier(){//创建dmtclientDtmClientdtmClient=newDtmClient(ipPort);//创建tcc事务try{dtmClient.tccGlobalTransaction(dtmClient.genGid(),TccTestController::tccBarrierTrans);}catch(Exceptione){log.error("tccGlobalTransactionerror",e);返回“失败”;}return"success";}publicstaticvoidtccBarrierTrans(Tcctcc)throwsException{//用户1转账30元ResponseoutResponse=tcc.callBranch(newTransReq(1,-30)),svc+"/barrierTransOutTry",svc+"/barrierTransOutConfirm",svc+"/barrierTransOutCancel");log.info("outResponse:{}",outResponse);//用户2转账30元ResponseinResponse=tcc.callBranch(newTransReq(2,30),svc+"/barrierTransInTry",svc+"/barrierTransInConfirm",svc+"/barrierTransInCancel");log.info("inResponse:{}",inResponse);}至此,一个完整的TCC分布式事务就写完了。如果想运行成功的例子,按照dtmcli-java-示例项目的描述搭建环境并启动后,运行下面的命令运行tcc例子来curlhttp://localhost:8081/tccBarrierTCC回滚如果银行要转账给用户2,发现用户2的账户异常,退回失败怎么办?在我们的例子中,用户余额为10000,转账100000会触发异常失败:curlhttp://localhost:8081/tccBarrierError这是交易失败交互的时序图。这与成功的TCC的区别在于,当一个子事务失败时,随后回滚全局事务,调用每个子事务的Cancel操作,保证全局事务全部回滚。上面描述了一个非常符合预期的回滚情况。实际运行的业务中会有很多种回滚。比如TransIn请求在开始处理前出现异常,或者交易完成提交交易。在子事务屏障的帮助下,用户不需要关心这些不同的异常,它会被自动处理。TransIn处理过程中事务提交后可以尝试抛异常。可以看到本例最后一笔交易被正确回滚,两个用户的最终余额与转账前相同。小结在本文中,我们介绍了TCC的理论知识,并通过一个例子,给出了一个完整的TCC事务编写流程,涵盖了正常成功完成和成功回滚的情况。相信读者通过本文对TCC有了更深入的了解。分布式事务中需要处理的幂等、挂起、空值补偿,请参考另一篇文章:分布式事务的这些常见用法都有坑。让我们来看看正确的姿势,更全面地了解分布式事务。请参考分布式事务最经典的七个解决方案。本文所用示例选自yedf/dtmcli-java-sample。使用的分布式事务管理器为https://github.com/yedf/dtm,支持多种事务模式:TCC、SAGA、XA,事务消息跨语言支持,已支持golang、python、PHP、nodejs等语言客户。提供子事务屏障功能,优雅解决幂等、挂起、空值补偿等问题。看完这篇干货,欢迎大家访问https://github.com/yedf/dtm项目,给个star支持!
