当前位置: 首页 > 后端技术 > Java

五分钟带你体验分布式事务!太简单!

时间:2023-04-01 16:38:20 Java

@[toc]网上关于分布式事务的理论很多,实战的比较少。今天想用一个案例让小伙伴们体验一下分布式事务。今天尽量少谈理论。我们今天的主角是西塔!分布式事务涉及到很多理论,比如CAP、BASE等,很多小伙伴看到这些理论都被劝退了,所以今天不讲理论,看个demo,快速感受下什么是分布式通过代码。正式事务。1.什么是西塔?Seata是一个开源的分布式事务解决方案,致力于提供高性能易用的分布式事务服务。Seata将为用户提供AT、TCC、SAGA和XA交易模式,为用户打造一站式的分布式解决方案。Seata支持四种事务模式:SeataAT模式SeataTCC模式SeataSaga模式SeataXA模式Seata有三个核心概念:TC(TransactionCoordinator)——事务协调器:维护全局和分支事务的状态,驱动全局事务提交或回滚。TM(TransactionManager)——事务管理器:定义全局事务的范围,启动全局事务,提交或回滚全局事务。RM(ResourceManager)-资源管理器:管理分支事务的资源(Resource),与TC对话以注册分支事务并报告分支事务的状态,并驱动分支事务的提交或回滚。其中,TC是单独部署的服务器,TM和RM是应用内嵌的客户端。如果你不知道这些概念,你可以使用Seata,你可以更好地理解Seata的工作原理。2.搭建Seata服务器让我们先搭建Seata服务器。Seata下载地址:https://github.com/seata/seata/releases最新版本是1.4.2,我们就用最新版本。这个工具部署在windows和linux上区别不大,所以我这里直接部署在windows上,这样比较方便。我们先下载1.4.2版本的zip压缩包,下载后解压,然后在conf目录下配置两个地方:首先配置file.conf,在文件file.conf中配置TC的存储方式。TC的三种存储模式:file:适用于单机模式,在内存中读写全局事务会话信息,持久化本地文件root.data,性能较高。db:适用于集群模式,全局事务会话信息通过db共享,相对性能较差。Redis:适用于集群模式,全局事务会话信息通过redis共享,性能较好,但需要注意的是Seata-Server1.3及以上版本支持redis模式,性能较高,但存在风险交易信息丢失,所以需要开发或提前配置适合当前场景的redis持久化配置。这里为了省事我们配置文件方式,让事务会话信息的读写在内存中完成,持久化写入本地文件,如下图:如果配置db或者redis方式,请记得在下方填写相关信息。具体如下图:题外话注意,如果使用db方式,需要提前准备好数据库脚本,如下(可以直接在公众号江南壹点鱼回复seata-db后台下载此数据库脚本):CREATEDATABASE/*!32312IFNOTEXISTS*/`seata2`/*!40100DEFAULTCHARACTERSETutf8mb4COLLATEutf8mb4_0900_ai_ci*//*!80016DEFAULTENCRYPTION='N'*/;USE`seata2`;/*表结构`branch_table`*/如果存在`branch_table`则删除表;CREATETABLE`branch_table`(`branch_id`bigint(20)NOTNULL,`xid`varchar(128)NOTNULL,`transaction_id`bigint(20)DEFAULTNULL,`resource_group_id`varchar(32)DEFAULTNULL,`resource_id`varchar(256)默认为空,`branch_type`varchar(8)默认为空,`status`tinyint(4)默认为空,`client_id`varchar(64)默认为空,`application_data`varchar(2000)默认为空,`gmt_create`日期时间(6)DEFAULTNULL,`gmt_modified`datetime(6)DEFAULTNULL,PRIMARYKEY(`branch_id`),KEY`idx_xid`(`xid`))ENGINE=InnoDBDEFAULTCHARSET=utf8;/*表`branch_table`的数据*//*表`global_t的表结构able`*/DROPTABLEIFEXISTS`global_table`;CREATETABLE`global_table`(`xid`varchar(128)NOTNULL,`transaction_id`bigint(20)DEFAULTNULL,`status`tinyint(4)NOTNULL,`application_id`varchar(32)DEFAULTNULL,`transaction_service_group`varchar(32)DEFAULTNULL,`transaction_name`varchar(128)DEFAULTNULL,`timeout`int(11)DEFAULTNULL,`begin_time`bigint(20)DEFAULTNULL,`application_data`varchar(2000)DEFAULTNULL,`gmt_create`datetimeDEFAULTNULL,`gmt_modified`datetimeDEFAULTNULL,PRIMARYKEY(`xid`),KEY`idx_gmt_modified_status`(`gmt??_modified`,`status`),KEY`idx_transaction_id`(`transaction_id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;/*表`global_table`的数据*//*表`lock_table`的表结构*/DROPTABLEIFEXISTS`lock_table`;CREATETABLE`lock_table`(`row_key`varchar(128)NOTNULL,`xid`varchar(128)DEFAULTNULL,`transaction_id`bigint(20)DEFAULTNULL,`branch_id`bigint(20)NOTNULL,`resource_id`varchar(256)DEFAULTNULL,`table_name`varchar(32)DEFAULTNULL,`pk`varchar(36)DEFAULTNULL,`gmt_create`datetimeDEFAULTNULL,`gmt_modified`datetimeDEFAULTNULL,PRIMARYKEY(`row_key`),KEY`idx_branch_id`(`branch_id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;另外还需要注意自己的数据库版本信息。更改数据库连接时,根据实际情况进行修改。Seata是针对MySQL5.x的,MySQL8.x和MySQL8.x都提供了相应的数据库驱动(在lib目录下),我们只需要修改驱动然后配置registry.conf文件即可。registry.conf主要是配置Seata的注册中心。Eureka,配置如下:可以看到,支持的配置中心很多。我们选择尤里卡。选择配置中心后,记得修改配置中心相关的信息。OK,至此配置完成,先不要启动,还有一个Eureka注册中心。3.项目配置接下来我们配置项目。Seata官方提供了一个非常经典的demo,我们直接来看这个demo。官方案例下载地址:https://github.com/seata/seata-samples但是这里很多案例混在一起,可能看起来很乱,而且因为要下载的依赖比较多,所以很可能会导致依赖下载失败,所以大家也可以在公众号后台回复seata-demo获取松哥整理的案例,直接导入即可,如下图:这是一个订货的案例,我来解释一下你一点:尤里卡:这是服务注册表。account:这是账户服务,可以查询/修改用户的账户信息(主要是账户余额)。order:这是订单服务,可以在这里下订单。storage:这是一个仓储服务,可以查询/修改商品的库存数量。bussiness:这里是business,用户的订单会在这里完成。这个案子是关于什么的?用户下单时,调用业务中的接口,业务中的接口调用自己的服务。服务中先开启全局分布式事务,然后通过feign调用storage中的接口扣除Inventory,然后通过feign调用接口创建订单(order在创建的时候不会只创建订单一个订单,还要扣除用户账户的余额)。扣除库存并完成订单创建后,下一步是检查用户余额和库存数量是否正确,如果用户余额或库存数量为负数,则回滚交易,否则提交交易。本案例的具体架构如下图所示:本案例是一个典型的分布式事务问题。存储和订单中的事务属于不同的微服务,但我们希望它们同时成功或同时失败。既然大家都明白了这个案例是怎么回事,那我们就来运行一下。先创建一个名为seata的数据库,然后执行上面代码中的all.sql数据脚本。接下来,用idea打开上面的项目,修改每个项目的application.properties文件中的数据连接信息(Eureka不用改),如下图:除了Eureka,其他四个都要变了。OK,配置结束。4.开始测试先启动Eureka。接下来别忘了启动其他服务,首先启动SeataServer,也就是我们在第二节配置的服务,在它的bin目录下,在Windows下双击/执行Linux下的启动脚本。最后分别启动剩下的四个服务。启动完成后,我们可以查看Eureka中的相关信息:可以看到,所有的服务都注册好了。接下来我们访问业务中提供的两个测试接口。第一个测试接口是:http://127.0.0.1:8084/purchase/commit这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseCommit,这个地方是模拟购买30的U100000用户C100000商品,每件商品的价格为100,商品库存为200,用户账户余额为10000,所以购买后商品库存变为170,用户账户余额变为7000。这是正常购买的情况。@RequestMapping(value="/purchase/commit",produces="application/json")publicStringpurchaseCommit(){try{businessService.purchase("U100000","C100000",30);}catch(Exceptionexx){返回exx.getMessage();}return"Globaltransactioncommit";}我们调完这个接口后,就可以去数据库中查看对应的数据了。第二次测试的接口是:http://127.0.0.1:8084/purchase/rollback这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseRollback,这次是模拟用户去购买了99999件商品,无论是用户账户余额还是库存商品数量都无法支持此次购买,所以最终会回滚该接口的调用,数据库中的数据将保持原样。@RequestMapping("/purchase/rollback")publicStringpurchaseRollback(){try{businessService.购买(“U100000”,“C100000”,99999);}catch(Exceptionexx){返回exx。获取消息();}return"Globaltransactioncommit";}这是一个分布式事务案例。有兴趣的朋友也可以研究一下官方案例。我们会发现这里的东西很简单。它只是对以下方法(io.seata.sample.service.BusinessService#purchase)的附加注解:@GlobalTransactionalpublicvoidpurchase(StringuserId,StringcommodityCode,intorderCount){orderFeignClient.create(userId,commodityCode,orderCount);if(!validData()){thrownewRuntimeException("账户或库存不足,执行回滚");}}purchase方法打上@GlobalTransactional注解,启动全局事务。里面的两个调用都是feign调用,对应不同的服务,最后做一个数据校验。如果测试失败,则抛出异常。一旦方法抛出异常,上面已经执行的代码就会回滚。本项目其余代码为微服务中的常规代码,不再赘述。5.实现原理下面简单说一下Seata中分布式事务的原理。首先我们来看一张图:这张图清楚的描述了上面的案例。大致流程如下:有TM、RM、TC三个概念,我们在第一节已经介绍过,这里不再赘述。首先全局事务由Business启动。接下来,当Business调用Storage和Order时,两者都会向TC注册一个分支事务,并在数据库操作前提交。当一个分支事务在运行时,会向undo_log表提交一条记录。当全局事务提交时,会清除undo_log表中的记录,否则,会根据表中的记录进行反向补偿(将数据恢复到原来的状态)。具体到上面的案例,事务提交分为两个阶段,流程如下:Stage1:首先,Business启动全局事务,在这个过程中会向TC注册,然后会得到一个xid,这是一个全局事务ID。接下来调用业务中的存储微服务。解析SQL:获取SQL类型(UPDATE)、表(storage_tbl)、条件(wherecommodity_code='C100000')等相关信息。查询前镜像:根据分析得到的条件信息生成查询语句,定位数据。执行业务SQL,即进行真正的数据更新操作。后查询镜像:根据前镜像的结果,通过主键定位数据。插入回滚日志:将前后镜像数据和业务SQL相关信息组合成一条回滚日志记录,插入到UNDO_LOG表中。branch_id和xid分别代表分支事务(即Storage自己的事务)和全局事务的id,rollback_info存放的是前后镜的内容,会作为反向补偿(rollback)的依据。这个字段的值是一个JSON,宋哥从这个JSON中挑出一个重要的部分分享给大家:beforeImage:这是修改前数据库中的数据,可以看到每个字段的值,id为4,计数为200。afterImage:这是修改后数据库中的数据。可以看到id为4,count为170。在commit之前,Storage会向TC注册分支:为storage_tbl表中主键值等于4的记录申请全局锁。本地事务提交:业务数据的更新与前面步骤生成的UNDOLOG一起提交。同样,Order和Account也按照上述步骤提交数据。以上步骤1-10是数据提交的第一阶段。再来看第二阶段:第二阶段有两种可能,commit或者rollback。仍以上述案例为例:@GlobalTransactionalpublicvoidpurchase(StringuserId,StringcommodityCode,intorderCount){storageFeignClient.deduct(commodityCode,orderCount);orderFeignClient.create(userId,commodityCode,orderCount);if(!validData()){thrownewRuntimeException("账户或库存不足,执行回滚");}}下订单时,扣除库存并创建订单。最后检查发现库存为负或者用户账户余额为负,说明这个订单有问题,是时候抛出异常回滚,否则提交数据。具体操作如下:回滚:收到TC的分支回滚请求后,启动一个本地事务,执行以下操作。使用xid和branch_id在undo_log表中找到对应的记录。数据验证:将第二步找到的后镜像与当前数据进行对比。如果有差异,则意味着数据已被当前全局事务以外的操作修改。这种情况需要根据配置策略进行处理。如果第三步比较相同,则根据undo_log中的前像和业务SQL信息生成并执行回滚语句。提交本地事务。并将本地事务的执行结果(即分支事务回滚的结果)报告给TC。提交:收到TC的分支提交请求后,将请求放入异步任务队列,立即返回提交成功的结果给TC。异步任务阶段的分支提交请求,会异步批量删除对应的UNDOLOG记录。也就是说,如果事务正常提交,undo_log表中是不会有任何记录的。如果要查看这个表中的记录,可以在事务提交前通过DEBUG查看。6.总结说了这么多,是不是Seata就完蛋了?不不不!这只是AT模式!还有其他三种模式,松哥会在下一篇文章中分享给小伙伴们。好了,这就是一个简单的分布式事务,小伙伴们先来体验一下吧!题目是五分钟感受一个分布式事务,因为我在文章里也跟大家分享了原理。如果只是跑case体验一下,五分钟应该够了。如果你不相信我,试试吧!