当前位置: 首页 > 后端技术 > Java

事务消息的实现-RocketMQ知识体系6

时间:2023-04-01 21:02:15 Java

分布式事务是指事务的参与者、支持事务的服务器、资源服务器和事务管理器分别位于不同分布式系统的不同节点上。比如在大型电商系统中,订单接口通常会扣库存,扣折扣,生成订单id。但是,订单服务不同于库存、折扣和订单ID。下单接口的成功与否不仅取决于本地的db操作,还取决于第三方系统的结果。这时,分布式事务保证这些操作要么全部成功,要么全部失败。本质上,分布式事务是为了保证不同数据库中数据的一致性。目前分布式事物有seata、lcn等解决方案。RocketMQ分布式事务实现RocketMQ提供了事务消息的功能,采用2PC(两阶段协议)+补偿机制(事务审核)的分布式事务功能,通过RocketMQ版本的事务消息可以实现分布式事务的最终一致性消息队列。首先我们要知道什么是半事务消息和消息回传:半事务消息:暂时无法投递的消息,发送方已经成功将消息发送到消息队列RocketMQ服务器,但是服务器还没有收到生产者的回应消息的第二次确认。此时,邮件被标记为“暂时无法送达”。此状态下的消息是半事务性消息。消息审核:由于网络断开、生产者应用重启等原因,某笔交易消息的二次确认丢失。当消息队列RocketMQ版本服务器通过扫描发现某条消息长期处于“半事务消息”中时,需要主动向消息生产者查询该消息的最终状态(Commit或回滚)。查询过程是消息核对。【交互过程】事务消息发送步骤如下:发送方将半事务消息发送到消息队列RocketMQ版本服务器。消息队列RocketMQ版本服务器持久化消息成功后,返回Ack给发送方。确认消息发送成功,此时消息为半事务消息。发送方开始执行本地事务逻辑。发送方根据本地事务的执行结果向服务端提交二次确认(Commit或Rollback),服务端收到Commit状态后将半事务消息标记为可交付,订阅者最终会收到消息;服务器收到Rollback状态下的半事务消息被删除,订阅者不会接受该消息。交易消息审核步骤如下:在断网或应用重启等特殊情况下,上述第4步提交的二次确认最终没有到达服务器,服务端会在收到消息后发起消息审核固定的时间段。发送方收到消息校验后,需要校验相应消息本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态重新提交二次确认,服务端仍然按照步骤4对半事务消息进行操作。一般来说,RocketMQ事务消息分为两条主线:发送过程:发送半消息(halfmessage),执行本地交易,发送交易执行结果。定时任务审核流程:MQ定时任务扫描半消息,回查本地事务,发送事务执行结果的源码与发送事务半消息的Producer(prepare)相关。本地应用中发送事务消息的核心类是TransactionMQProducer。该类继承DefaultMQProducer以重用大部分与发送消息相关的逻辑。这个类的代码量很小。100行,下面是这个类的sendMessageTransaction方法。这里的transactionListener就是上面说的消息回溯类。它提供了2个方法:executeLocalTransaction执行本地事务checkLocalTransaction检查本地事务,然后查看DefaultMQProducer.sendMessageInTransaction()方法:该方法主要做了以下事情,给消息打上一个事务消息相关的标签,供broker使用区分普通消息和交易消息。发送半条消息。如果半消息发送成功,transactionListener将执行本地事务。执行endTransaction方法并告诉代理执行commit/rollback。执行本地事务Producer半事务消息发送成功后,会调用transactionListener.executeLocalTransaction方法执行本地事务。只有半消息发送成功后,才会执行本地事务,如果半消息发送失败,则设置回滚。结束事务(commit/rollback)本地事务执行完成后,根据本地事务的执行状态,调用this.endTransaction()方法提交或回滚事务。如果半消息发送失败或者本地事务执行失败,则通知服务器删除半消息。如果半消息发送成功,本地事务执行成功,则告诉服务器生效。Producer发送的消息最终会调用SendMessageProcessor.sendMessage()判断消息类型并存储消息。存储半消息代码prepareMessage(msgInner):这一步备份消息原主题名和原队列ID,然后取消交易消息的消息标签,重新设置消息主题为:RMQ\_SYS\_TRANS\_HALF\_TOPIC,queue的ID固定为0,区别于其他普通消息,然后完成消息持久化。至此,Broker已经初步处理了Producer发送的交易半消息。半消息事务核对两阶段协议发送和提交回滚消息。当本地事务消息执行后状态为UNKNOW时,事务结束,不做任何操作。通过定期查看交易状态,发送方的交易状态是回滚还是提交。使用TransactionalMessageCheckService线程定期检测主题RMQ\_SYS\_TRANS\_HALF\_TOPIC中的消息,回查消息的事务状态。RMQ\_SYS\_TRANS\_HALF\_TOPICprepare消息的主题,事务消息首先进入这个主题。RMQ\_SYS\_TRANS\_OP\_HALF\_TOPIC当消息服务器收到事务消息的提交或回滚请求时,会将消息存储在该主题下。Broker处理END\_TRANSACTION当Producer或定时任务提交/回滚事务时,Broker如何处理事务消息提交和回滚命令?其核心实现如下:如果根据commitlogOffset找到一条消息,如果是commit动作,则恢复原消息的主题和队列,重新存入commitlog文件,并传递到消息消费队列中进行消费者消费,然后将原始预处理后的消息存储在一个新主题中RMQ\_SYS\_TRANS\_OP\_HALF\_TOPIC表示消息已经处理回滚,将原始预处理后的消息直接存储在一个新主题中RMQ\_SYS\_TRANS\_OP\_HALF\_TOPIC,表示消息已经处理完毕。整体实现流程消费端消费失败怎么办?如果消息消费失败,会将失败的消息发回给broker,即重写commitLog文件,消费者重新消费;如果消息返回时consumer和broker之间的网络断开了,consumer会调用submitConsumeRequestLater()方法在consumer端重新消费。如果仍然消费失败,会继续重试,直到达到默认的16次。您可以使用msg.getReconsumeTimes()方法获取当前的重试次数。如果重试次数足够多之后,仍然无法消费成功,必须通过工单、日志等方式进行人工干预,让生产者事务回滚。Producer发送半消息失败可能是由于网络或mq故障,导致Producer订单系统无法发送半消息(prepare)。此时订单系统可以进行回滚操作,如“订单关闭”等,通过逆向流程向用户退款。半消息发送成功,但是本地事务执行失败。如果订单系统发送的半消息成功,但是本地事务执行失败,比如更新订单状态为“已完成”。这种情况下,本地事务执行失败后,会回滚给MQ,MQ会删除之前发送的一半消息。不会调用优惠券系统。半消息发送成功,但是没有收到MQ的响应。如果订单系统发送半消息成功,则不会收到MQ的响应。这时候可能是因为网络问题或者其他异常错误。订单系统误认为发送MQ半消息失败,进行了反向回滚处理。但是此时,实际上mq已经成功保存了一半的消息,那么如何处理这条消息呢?此时MQ的后台消息审核定时任务TransactionalMessageCheckService会每隔1分钟扫描一次半消息队列,判断是否需要进行消息审核,然后对订单系统的本地事务进行审核。这时,MQ会发现订单已经“完成”了。close”,这时候需要向mq发送回滚请求,删除前半条消息,如果commit/rollback失败,这其实是通过定时任务TransactionalMessageCheckService,会发现消息还没有处理到第二阶段在一定时间后,会回查本地事务总结消息队列RocketMQ分布式事务消息不仅可以实现应用程序之间的解耦,还可以保证数据的最终一致性。同时,可以将传统的大事务拆分成小事务,不仅可以提高效率,还可以防止因某个关联应用不可用而导致整体回滚,从而最大限度地提高核心系统的可用性。极端情况下,如果关联应用无法成功处理,只需要对当前应用进行补偿或数据更正处理,而不需要对整体业务进行回滚。RocketMQ事务消息链路体现了面向失败的设计思想,也体现了事务系统的严谨性。当第二阶段的消息没有投递时,broker会主动请求producer做check,producer就完成了。检查后,将再次返回交易状态。尽管有许多解决方案可以实现最终一致性,但事务性消息是更优雅的实现之一。