当前位置: 首页 > 科技观察

使用RocketMQ实现可靠的消息最终一致性方案,前面写过yyds

时间:2023-03-13 08:11:11 科技观察

对于常见的微服务系统,大部分接口调用都是同步的,即一个服务直接调用另一个服务的接口。这时候使用TCC分布式事务方案来保证各个接口的调用要么一起成功要么一起回滚就比较合适了。但是在实际系统的开发过程中,服务之间的调用可能是异步的。也就是说,一个服务向MQ发送消息,即消息中间件,如RocketMQ、RabbitMQ、Kafka、ActiveMQ等。然后,另一个服务使用来自MQ的消息并对其进行处理。这就变成了基于MQ的异步调用。那么对于这种基于MQ的异步调用,如何保证服务间的分布式事务呢?也就是说,我希望的是基于MQ实现异步调用的多个服务的业务逻辑,要么一起成功,要么一起失败。这时候就需要使用可靠的消息最终一致性方案来实现分布式事务。看上图。其实,如果不考虑高并发、高可用等各种技术挑战,仅从“可靠消息”和“最终一致性”的角度来看,这种分布式事务方案还是比较简单的。可靠消息最终一致性方案的核心流程1.上游服务下发消息。如果要实现可靠消息的最终一致性方案,一般可以自己写一个可靠消息服务来实现一些业务逻辑。首先,上游服务需要向可靠的消息服务发送消息。这条消息很通俗,可以认为是调用下游服务的一个接口,里面包含了一些相应的请求参数。然后,可靠的消息服务必须将这条消息存储在自己的数据库中,状态为“等待确认”。然后,上游服务可以执行自己本地的数据库操作,并根据自己的执行结果再次调用可靠消息服务的接口。如果本地数据库操作成功执行,则使用可靠的消息服务来确认该消息。如果本地数据库操作失败,则寻找可靠的消息服务来删除该消息。如果此时是确认消息,可靠消息服务会将数据库中的消息状态更新为“已发送”,同时将消息发送给MQ。这里有一个关键点,就是更新数据库中的消息状态,并将消息投递给MQ。这两个操作,你要放在一个方法里,你要开一个本地事务。你是什??么意思?如果更新数据库中消息的状态失败,则抛出异常退出,不传递给MQ;如果MQ下发失败报错,则必须抛出异常让本地数据库事务回滚。这两个操作必须一起成功,否则一起失败。如果上游服务通知删除消息,则可靠消息服务必须删除消息。2.下游服务收到消息。下游服务一直在等待消费来自MQ的消息。如果消息被消费,那么它将操作自己的本地数据库。如果操作成功,则依次通知可靠消息服务,说处理成功,然后可靠消息服务将消息的状态设置为“完成”。3、上游服务如何传递100%可靠的消息?上面的核心流程大家都看过了:一个很大的问题是,如果上面传递消息的过程中各个环节出现问题怎么办?消息从上游服务传递到下游服务,如何保证100%可靠传递?别着急,下面我们一一分析。如果在将上游服务需要确认的消息发送给可靠消息服务的过程中出现错误,也没有关系。上游服务可以感知到调用的异常,所以不需要执行后面的流程,是没有问题的。如果上游服务操作完本地数据库,通知可靠的消息服务确认消息或删除消息,就会出现问题。例如:通知不成功,或者执行不成功,或者可靠的消息服务未能将消息投递到MQ。这一系列步骤出现问题怎么办?这并不重要,因为在这些情况下,该消息在可靠消息服务数据库中的状态将始终为“等待确认”。至此,我们在可靠的消息服务中开发一个线程在后台定时运行,不断的检查每条消息的状态。如果一直处于“待确认”状态,则认为消息有问题。这时候你可以回调上游服务提供的一个接口,问问大哥,这条消息对应的数据库操作执行成功了吗?如果上游服务回复我执行成功,那么可靠的消息服务会修改消息状态为“已发送”,同时将消息投递给MQ。如果上游服务回复执行不成功,那么可靠的消息服务可以删除数据库中的消息。通过这种机制,可以保证可靠的消息服务会尝试完成消息到MQ的投递。4、如何保证下游服务100%可靠接收消息?那么如果下游服务消费消息有问题,不消费?或者下游服务处理消息失败,怎么办?其实没关系,在靠谱的消息服务中开发一个后台线程,不断的查看消息的状态。如果消息的状态一直是“Sent”,一直没有变成“Completed”,说明下游服务一直没有处理成功。这时可靠的消息服务可以尝试再次将消息重新投递给MQ,以便下游服务重新处理。只要下游服务的接口逻辑做到幂等,就足以保证一条消息被多次处理,不插入重复数据。5、如何基于RocketMQ实现可靠的消息最终一致性方案?在上面的总体方案设计中,完全依赖可靠消息服务的各种自检机制来保证:如果上游服务的数据库操作失败,下游服务将不会收到任何通知;如果上游服务的数据库操作成功,reliable消息服务将确保将调用消息传递给下游服务,并且它会确保下游服务必须成功处理该消息。通过这种机制,保证了基于MQ异步调用/通知的服务间的分布式事务保证。其实阿里开源的RocketMQ实现了可靠消息服务的所有功能,核心思想和上面类似。只是为了保证高并发、高可用、高性能,RocketMQ实现了一个比较复杂的架构,这点很好。有兴趣的同学可以自行查看RocketMQ对分布式事务的支持。可靠消息最终一致性方案高可用保障生产实践一、背景介绍其实很多同学应该都知道上面的方案和思路是怎么回事,我们主要是在铺垫这套理论思路。在实际生产中,如果没有高并发场景,可以参考上述思路,基于某个MQ中间件开发可靠的消息服务。如果有高并发的场景,可以使用RocketMQ的分布式事务支持,上面的一套流程就可以实现。今天和大家分享的一个核心话题就是如何保证这个方案99.99%的高可用。其实大家应该已经发现,这个解决方案中保证高可用最大的依赖就是MQ的高可用。任何一种MQ中间件,无论是RabbitMQ、RocketMQ还是Kafka,都有一套完整的高可用保障机制。因此,在大公司使用可靠的消息最终一致性方案时,我们对可用性的保证通常依赖于公司基础设施团队对MQ的高可用保证。也就是说大家要相信小弟团队,99.99%可以保证MQ的高可用,绝对不会因为MQ集群整体宕机导致公司业务系统所有分布式事务失败。但现实是残酷的。很多中小型公司,甚至一些中大型公司,都或多或少遇到过MQ集群整体故障的情况。一旦MQ完全不可用,将导致业务系统的各种服务无法通过MQ传递消息,从而导致业务流程中断。比如最近朋友的公司也在做电商业务。公司机器上MQ中间件部署的集群整体故障不可用,导致所有依赖MQ的分布式事务无法运行,大量业务流程。中断情况。在这种情况下,就需要为这个分布式事务解决方案实现一个高可用的保障机制。2、基于KV存储的队列支持的高可用降级方案我们看下图。这是我曾经指导朋友做可靠的消息最终一致性方案的公司设计的高可用保证降级机制。这个机制并不太复杂,可以非常简单有效的保证友商的高可用保障场景。一旦MQ中间件出现故障,它会立即自动降级为备份解决方案。(1)MQ客户端组件的自封装和故障感知首先,如果要自动感知MQ故障,然后自动完成降级,那么就必须手动封装MQ客户端,发布到公司的Nexus私服上。那么公司需要支持MQ降级的业务服务就是使用这个自己封装的组件向MQ发送消息,消费MQ的消息。在自己封装的MQ客户端组件中,可以根据写入MQ的情况来判断MQ是否故障。例如,如果发现给MQ投递消息连续重试10次,报异常错误,网络无法连接等,说明MQ有问题。此时可自动感应并自动触发降级开关。(2)基于kv存储中队列的降级方案如果MQ挂了,要想继续传递消息,就必须找一个MQ的替代者。比如我朋友的公司没有高并发场景,消息量很小,但是可用性要求很高。此时可以换成类似于redis的kv存储的队列。由于redis本身支持队列的功能以及各种类似队列的数据结构,所以可以将消息写入kv存储格式的队列数据结构中。ps:关于redis的数据存储格式、支持的数据结构等基础知识请自行查看,网上一大堆。不过这里有几个大坑,一定要注意。首先,对于任何存储在kv中的集合数据结构,建议不要向其中写入过多的数据,否则会导致大值的出现,造成严重的后果。所以,想在redis中创建一个key,然后拼命往这个数据结构中写入消息是绝对不可能的。第二,绝对不能在少数key对应的数据结构中连续写入数据,这样会导致hotkey的产生,即某些key特别hot。大家应该都知道,一般kv集群是根据key的hash分配给每台机器的。如果一直写几个key,会导致kv集群中某台机器访问过多,负载过大。基于以上考虑,以下是笔者当时设计的方案:根据他们每天的消息量,在kv存储中固定划分了数百个queue,对应上百个key。这样可以保证每个key对应的数据结构中不会写入过多的消息,也不会频繁写入少数几个key。一旦MQ发生故障,可靠的消息服务可以通过hash算法将每条消息均匀的写入数百个固定key对应的kv存储队列中。同时,此时需要通过zk触发一个降级开关,整个系统在MQ块中的读写都会立即降级。3、下游服务消费MQ的降级感知下游服务消费MQ也是通过自己封装的组件来完成的。这个时候如果组件从zk感知到降级开关打开了,它会先判断是否可以继续从MQ消费数据?如果没有,就开启多个线程,并发从kv存储中的数百个预设队列中获取数据。每获取一条数据,就交给下游服务的业务逻辑去执行。通过这种机制,实现了MQ故障时的自动故障感知和自动降级。如果系统的负载和并发不是很高,使用这个方案一般是没有问题的。因为在生产落地的过程中,包括大量的容灾演练和实际生产故障发生时的表现,可以有效保证业务流程在MQ故障发生时继续自动运行。4、故障自动恢复如果降级开关打开,自封装组件需要开启一个线程,每隔一段时间尝试向MQ发送消息,看是否恢复。如果MQ已经恢复,可以正常投递消息,此时可以通过zk关闭降级开关,然后可靠的消息服务继续向MQ投递消息。下游服务确认kv中存储的各个队列没有数据后,可以再次进行切换。使用来自MQ的消息。5.更多的业务细节其实上面说的这套方案主要是一套通用的降级方案,具体的实施是结合各个公司不同的业务细节来决定的,很多细节无法在文章中体现。比如你要保证消息的顺序?是否涉及到需要根据业务动态生成大量的key?另外,这个方案的实施还是有一定成本的,所以建议大家尽量留在推送公司的基础架构团队,保证MQ99.99%的可用性不宕机。二是根据贵公司实际的高可用需求来决定。如果你觉得MQ偶尔宕机是可以接受的,那你就不需要实施这个降级方案了。但是,如果公司领导认为MQ中间件宕机后,业务系统进程还要继续运行,那么就要考虑一些高可用降级的方案,比如本文提到的那种。最后说一下,如果有的公司涉及到每秒几万、几十万的高并发请求,那么MQ降级方案的设计会比较复杂,远不是这么简单可实现的。