本文转载请联系菜鸟飞亚飞公众号。前言在我遇到的几家大厂,几乎每一轮面试官(“没错,几乎每一轮面试官”)都问了同一个问题:你们的系统是分布式系统吗?答:是的。面试官:那你们的分布式系统是怎么解决分布式事务的问题的?即如何保证数据的一致性。答:在我们的系统中,使用RocketMQ事务消息来保证数据的最终一致性。面试官:那你说说它是怎么保证数据的最终一致性的?答:分两部分回答。第一部分首先回答交易消息的实现过程,第二部分解释为什么它可以保证数据的最终一致性。.事务消息的实现过程事务消息首先,服务A向MQ发送一条半事务消息(也称半消息)。为什么要先发送半条消息?这是为了保证服务A和MQ之间的通信是正常的。如果无法正常通信,服务A可以直接返回异常,无需处理背后的逻辑。如果半消息发送成功,MQ收到半消息后会返回成功响应给服务A。服务A收到MQ返回的成功响应后,开始处理本地业务逻辑,提交本地事务。如果服务A的本地事务提交成功,会向MQ发送commit,表示提交了一半消息,MQ会执行第五步;如果服务A的本地事务提交失败,则直接回滚本地事务发送给MQ发送回滚就是回滚之前的半条消息,MQ收到回滚消息后会删除这条半条消息。如果提交,将一半消息写入磁盘。如果MQ长时间没有收到提交或回滚消息,例如:服务A在处理本地业务时宕机,或者发送的提交和回滚由于网络环境较弱而丢失。然后MQ会在一定时间后尝试调用服务A提供的一个接口,通过这个接口判断半条消息的状态。因此,服务A提供的接口需要实现业务逻辑:根据数据库中对应数据的状态,判断上半消息对应的业务是否执行成功。如果MQ从该接口得知半消息执行成功,则MQ将半消息持久化到本地磁盘,如果知道执行不成功,则删除该半消息。服务B从MQ中消费相应的消息。服务B处理本地业务逻辑,然后提交本地事务。如何保证数据的最终一致性实现过程说完了,现在你可能会有各种疑惑?问:什么是半消息?A:和我们正常发送的普通消息是一样的,都是存储在MQ中的,唯一不同的是一半不会在MQ中立即被消费者消费,除非一半的消息被commit。(至于为什么未提交的半消息不能被消费者读取到,这是因为在MQ内部,对于事务性消息,在提交之前,会先放入一个内部队列中,只有提交之后,才会真正发送消息放置在消费者可以阅读的主题队列中)问:为什么先发送一半的消息?A:前面已经解释过了,主要是保证服务A和MQ之间可以正常通信。就算不能正常通信,待会儿玩个锤子,直接返回异常就好了。Q:如果MQ收到了半条消息,但是在返回成功响应时,由于网络原因服务A收不到成功响应,这时候是什么现象?A:服务A发送半条消息时,会等待MQ给自己返回一个成功的响应,如果没有收到,那么服务A会直接结束,返回异常,不会执行后续逻辑。后续逻辑不执行,所以服务A不会向MQ提交commit消息。如果MQ长时间没有收到commit消息,会主动回调服务A的一个接口,服务A通过接口查询本地数据后,发现如果这条消息对应的业务没有正常执行,那么告诉MQ这半条消息不能提交,需要回滚。MQ知道后,会删除半条消息。Q:服务A本地事务执行失败怎么办?A:服务A的本地事务执行失败后,先回滚自己的本地事务,然后向MQ发送回滚操作。Q:服务A本地事务提交成功或失败后,由于网络问题发送给MQ的提交或回滚消息丢失,怎么办?A:和上一个问题一样,MQ长时间没有收到half消息的commit或者rollback消息,MQ会主动回调服务A的接口,通过这个接口来判断如何处理half消息。Q:上面说的都是交易消息的实现过程。这与事务消息如何保证数据的最终一致性有什么关系呢?答:是的。首先服务A执行本地事务和向MQ提交发送消息,这是两个写操作。然后通过RocketMQ事务消息,我们保证这两个写操作要么成功,要么失败。然后让其他系统,比如服务B去消费MQ中的消息,然后执行自己的本地事务,这样到最后,服务A和服务B两个系统的数据状态是一致的吗?这就是最终一致性的意义。如果需要服务A和服务B的数据状态,则服务A返回给客户端时两者一致。这就是强一致性,RocketMQ不能保证强一致性。目前,“通过可靠消息保证数据的最终一致性”是很多大厂采用的方案,基本都是利用MQ和补偿机制来保证数据的一致性。(所谓消息可靠,就是消息不会丢失,如何保证MQ消息不丢失,下篇文章会写,这也是面试常见问题。)Q:我应该怎么做如果服务B本地事务提交失败?A:如果服务B的本地事务失败,如果事务提交失败,可以多次重试,直到成功。如果多次重试提交失败,比如此时服务B对应的DB宕机了,只要此时服务B不将这条消息的offset提交给MQ即可。如果offset没有提交,MQ会在一定时间后继续推送这条消息给服务B,服务B才能继续执行本地事务并提交,直到成功。这样一来,还是保证了服务A和服务B的数据的最终一致性。使用RokcetMQ事务消息的代码实现主要涉及两部分:如何发送半事务消息,可以通过“TransactionMQProducer”类来实现。TransactionMQProducertransactionMQProducer=newTransactionMQProducer("producerGroup");TransactionSendResultresult=transactionMQProducer.sendMessageInTransaction(msg,null);//用result判断半消息是否发送成功if(result.getSendStatus()==SendStatus.SEND_OK){//Success}else{//Failure}前面我们提到,服务A需要提供一个接口让MQ回调服务A,其实这个接口就是一个listener:"TransactionListener"方法。这是一个提供两种方法的接口。publicinterfaceTransactionListener{//当半条消息发送成功后,我们在这里实现自己的业务逻辑,然后提交或回滚到MQLocalTransactionStateexecuteLocalTransaction(finalMessagemsg,finalObjectarg);//这个方法是MQ回调的方法,MQ通过回调来判断这个方法半条消息的状态//可以看到这个方法的参数是MessageExt,就是半条消息的内容。根据MessageExt,我们完全可以判断之前的业务在服务A中是否处理成功了。{//处理业务逻辑//....//业务逻辑处理成功,commitreturnLocalTransactionState.COMMIT_MESSAGE;}catch(Exceptione){}//业务处理失败,rollbackreturnLocalTransactionState.ROLLBACK_MESSAGE;}@OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){returnnull;}}另外,在创建producer时,指定我们实际发现的监听器TransactionMQProducertransactionMQProducer=newTransactionMQProducer("producerGroup");transactionMQProducer.setTransactionListener(newMyTransactionListener());分布式事务汇总是一个大工厂面试题是必须的,目前大多数公司都是通过可靠的信息来保证数据的最终性,通常使用RocketMQ来实现。如果你想去阿里,我建议MQ,选择RocketMQ多复习一下。
