当前位置: 首页 > 科技观察

哪种分布式事务处理方案效率最高?一定是...

时间:2023-03-17 18:42:44 科技观察

在之前的文章中,宋哥给大家介绍了Seata中的四种分布式事务处理方案。相信通过前面几篇文章的学习,大家对Seata中的分布式事务有了一定的了解。.没看过上一篇的朋友可以先看看:五分钟带你体验分布式事务!太简单!看了那么多博客,还是不明白TCC,不妨看看这个案例!XA交易很水很深,小子,怕你抓不住!你的Saga交易保证“隔离”吗?但是很多朋友看了之后觉得Seata的分布式事务处理代码简单,但是内部在网络上耗费的时间。太多了,在高并发场景下,这似乎不是一个好的解决方案。要说哪种分布式事务处理方案高效,一定绕不开消息中间件!基于消息中间件的两阶段提交方案通常用于高并发场景。这种方式牺牲了数据的强一致性来换取性能上的大幅提升,但是实现这种方式的成本和复杂度都比较高,使用时要根据实际业务情况而定。今天宋哥想用一个简单的案例和大家聊聊如何通过消息中间件来处理分布式事务。一、思路分析先说一下整体思路。有一个名词叫消息驱动的微服务,相信很多朋友都听说过。你怎么理解的?在微服务系统中,我们可以使用HTTP,比如OpenFeign,或者RPC,比如Dubbo,来实现服务之间的相互调用。除了这些方案,我们还可以使用消息驱动,这是典型的响应式系统设计方案。在消息驱动的微服务中,服务不再直接相互调用。当服务需要通信时,将通信内容发送给消息中间件,另一个服务在消息中间件中监听消息队列。完成相应的业务逻辑调用,流程就是这么一个过程,不难,怎么玩,我们继续往下看。2.业务分析折腾了很久,然后宋哥在网上找了一个别人写的例子。我觉得很适合演示这个问题,所以没有自己写案例,直接用了别人的代码。让我们一一进行。分析和上面提到的分布式事务Seata的方式是一致的。首先我们看下面的流程图,是一个用户买票的案例:当用户要买票时:向新的订单队列写入一条数据。订单服务负责消费这个队列中的消息,完成订单的创建,然后向新的订单支付队列写入消息。UserService负责消费新订单支付队列中的消息,在UserService中完成用户账户余额的扣款,然后向新订单转账队列中写入消息。TicketService负责消费新的订单转移票队列,在TicketService中完成票转移,然后向订单完成队列发送消息。最后,订单服务负责监控订单完成队列并处理完成的订单。这是一个典型的消息驱动的微服务,也是一个典型的响应式系统。在这个系统中,一共有三个服务,分别是:OrderServiceUserServiceTicketService这三个服务之间不会直接调用。大家会直接发给消息中间件,其他服务会从消息中间件中获取自己想要的消息,然后进行处理。具体在我们的实践中,多了一个检查ticket是否充足的流程,如下图所示:在创建订单时,Ticket服务首先检查ticket是否充足,然后继续发起订单的创建如果没有问题就下单。其他流程我就不说了。另外需要注意的是,在售票系统中,由于每张票都是不同的,比如每张票可能有座位什么的,所以一张票往往被设计成数据库中的一条记录。3.我把练习过程解释清楚了。接下来我们就来看看具体的代码实践。3.1准备数据库首先我们准备三个数据库,分别是:javaboy_order:订单库,用户创建订单等操作,都在这个数据库中完成。javaboy_ticket:票证库,存放所有的票证信息,每张票证就是一条记录,存放在这个库中。javaboy_user:用户库,存放用户的账户余额和支付记录。每个库都有自己对应的表。为了操作方便,这些表不需要自己创建。以后项目启动时,可以使用JPA自动创建。3.2项目概况让我们从整体上看一下这个项目。公众号后台回复mq_tran下载完整代码:一共有五个服务:eureka:注册中心order:订单服务service:公共模块ticket:票务服务user:用户服务单独说吧。3.3注册中心有人说都是消息驱动,为什么还要注册中心?消息驱动是对的。消息驱动微服务后,每个服务只能向消息中间件抛出消息,每个服务也只能消费消息中间件。从上面的消息来看,这个时候好像对服务注册中心没有那么强烈的需求。但是在我们的例子中,消息驱动主要是用来处理事务问题的,其他的常规需求我们还是使用OpenFeign来处理,所以这里还是需要一个注册中心。这里的注册中心,我选择的是普通的Eureka,比较省事。由于这篇文章主要是分布式事务,所以我就简单介绍一下微服务相关的东西,不会占用太多篇幅。如果不熟悉SpringCloud的用法,可以在公众号套视频演示后台回复vhr。记得在服务注册中心的创建中加入SpringSecurity来保护你自己的服务注册中心。这块有个小细节想跟大家多说几句。Eureka被SpringSecurity保护后,以后其他服务注册都会通过HttpBasic进行认证,所以我们需要在代码中开启HttpBasic认证,如下(以下代码老版本不需要,新版本需要)版本确实如此):@ConfigurationpublicclassSecurityConfigextendsWebSecurityConfigurerAdapter{@Overrideprotectedvoidconfigure(HttpSecurityhttp)throwsException{http.authorizeRequests().anyRequest().authenticated().and().httpBasic().and().formLogin().and().csrf().disable();}}3.4票务服务接下来我们来看票务服务。购票是从下单开始的,所以我们从订单服务下单开始分析整个流程。3.4.1新订单处理(order)当用户发起购票请求时,请求被发送到订单服务,订单服务首先发送消息到order:newqueue,开始订单处理流程。代码如下:@Transactional@PostMapping("")publicvoidcreate(@RequestBodyOrderDTOdto){dto.setUuid(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("order:new",dto);}上面设置的UUID是整个订单处理过程中的唯一标识,也可以看做是一条主线。order:new队列中的消息会被ticketservice消费,ticketservice会消费order:new中的消息并进行ticketlock操作(锁定ticket的目的是为了防止两个消费者同时购买同一张票)。票被锁定成功后,票务服务会向order:locked队列发送消息,表示票被锁定成功;否则,它会向order:fail队列发送一条消息,表明票证锁定失败。这里的OrderDTO对象会贯穿整个购票流程。3.4.2在票务服务中完成票务锁定操作,代码如下:@Transactional@RabbitListener(queues="order:new")publicvoidhandleTicketLock(OrderDTOmsg){LOG.info("Getneworderforticketlock:{}",msg);intlockCount=ticketRepository.lockTicket(msg.getCustomerId(),msg.getTicketNum());if(lockCount==0){msg.setStatus("TICKET_LOCK_FAIL");rabbitTemplate.convertAndSend("order:fail",msg);}else{msg.setStatus("TICKET_LOCKED");rabbitTemplate.convertAndSend("order:locked",msg);}}首先调用lockTicket方法锁定数据库中的ticket,所谓lockedticket就是票待购买的lock_user字段设置为customer_id(买家的id)。如果票被锁定成功(即修改数据库成功),则设置msg的状态为TICKET_LOCKED,同时向order:locked队列发送消息,表示票被锁定成功。如果票锁失败(即修改数据库失败),则设置msg的状态为TICKET_LOCK_FAIL,同时向order:fail队列发送消息,表示票锁失败。3.4.2成功锁票(订单)接下来,订单服务消费order:locked队列中的消息,这是锁票成功后的下一步操作。@Transactional@RabbitListener(queues="order:locked")publicvoidhandle(OrderDTOmsg){LOG.info("Getnewordertocreate:{}",msg);if(orderRepository.findOneByUuid(msg.getUuid())!=null){LOG.info("Msgalreadyprocessed:{}",msg);}else{Orderorder=newOrder(msg);orderRepository.save(order);msg.setId(order.getId());}msg.setStatus("NEW");rabbitTemplate.convertAndSend("order:pay",msg);}锁票成功后,先去订单数据库根据订单的UUID查询是否有订单记录。如果是,则表示该消息已被处理。你可以防止重复处理订单(这个主要是为了解决幂等性的问题)。如果订单还没有被处理,则创建一个新的订单对象并保存在数据库中。创建新的订单对象时,需要将订单的状态设置为NEW。最后,将msg的状态设置为NEW,然后向order:pay队列发送消息,开始支付流程。付款由用户服务提供。用户服务会检查用户的账户余额是否足够。如果不是,会向order:ticket_error队列发送消息,提示订票失败;如果余额足够,支付成功后会进行正常的支付操作并发送。消息被发送到order:ticket_move队列以开始票据传输。3.4.3支付(用户)锁定成功后,进行下一步支付,付费服务由用户提供。@Transactional@RabbitListener(queues="order:pay")publicvoidhandle(OrderDTOmsg){LOG.info("Getnewordertopay:{}",msg);//先查看payInfo,判断重复消息。PayInfopay=payInfoRepository.findOneByOrderId(msg.getId());if(pay!=null){LOG.warn("Orderalreadypaid,duplicatedmessage.");return;}Customercustomer=customerRepository.getById(msg.getCustomerId());if(customer.getDeposit()orders=orderRepository.findAllByStatusAndCreatedDateBefore("NEW",checkTime);orders.stream().forEach(order->{LOG.error("Ordertimeout:{}",order);OrderDTOdto=newOrderDTO();dto.setId(order.getId());dto.setTicketNum(order.getTicketNum());dto.setUuid(order.getUuid());dto.setAmount(order.getAmount());dto.setTitle(order.getTitle());dto.setCustomerId(order.getCustomerId());dto.setStatus("TIMEOUT");rabbitTemplate.convertAndSend("order:ticket_error",dto);});}可以看出这里是去数据库中检索那些状态为NEW和1分钟前的订单。根据前面的分析,当工单成功锁定后,订单的状态会被设置为NEW,并存入数据库。也就是说,当一分钟锁票成功,还没有售出时,设置订单超时时间,同时向order:ticket_error队列发送消息。该消息在票务服务中被消费,以及取消票务发送和取消票务锁定等操作。代码处理流程大致就是这样。我们再回顾一下上一张图:这张图结合代码看是不是很容易理解?3.5测试接下来我们来做一个简单的测试。先来一个订票失败的测试,如下:由于用户只有1000元,这张票要10000元,所以购票一定失败。请求执行成功后,我们查看订单表,多了一条记录如下:可以看到,订单失败的原因是账户余额不足。这时候查看ticket和user表,发现都完好无损(如果需要的话,已经进行了反向补偿)。接下来,我们手动给ticket表中的lock_user字段设置一个值,如下:表示该ticket已经被人锁定了。然后我们发起一个购票请求(这个时候可以把金额设置到一个合理的范围内,其实不设置也没关系,反正这次失败还没有到支付这一步):请求发送成功后,我们接下来检查订单表。多了一条记录如下:可以看出这次下单失败的原因是票锁失败。这时候查看ticket和user表,发现都完好无损(如果需要的话,已经进行了反向补偿)。最后再测试一次成功,先清空ticket表中的lock_user字段,然后发送如下请求:这次购票成功,查看ticket表,invoice已经属于ticket:查看order表:有可以多一条购票成功记录。查看用户表:用户账户已被扣款。查看付款记录表:可以看到已经有付款记录了。4.小结总体来说,上述案例在技术上难度不大,难点在于设计。一开始需要设计消息处理流程,消息处理失败后如何补偿。这是对每个人技能的考验。另外,在上面的案例中,消息的发送和消费都使用了RabbitMQ中的事务机制(保证消息消费成功)和Spring中的事务机制(同时保证消息发送和数据存储成功)时间),这里不再赘述。简而言之,分布式事务是通过消息中间件进行处理的。这种方式牺牲了数据的强一致性来换取性能上的大幅提升,但是实现这种方式的成本和复杂度都比较高。使用时,要根据实际业务情况而定。.本文转载自微信公众号“江南的一场小雨”,可通过以下二维码关注。转载本文请联系江南一点鱼公众号。