当前位置: 首页 > 后端技术 > Java

一起探讨,消息幂等(去重)通用解决方案

时间:2023-04-01 21:56:36 Java

消息中间件是分布式系统中常用的组件,无论是异步、解耦、调峰等,都有广泛的应用价值。我们通常认为消息中间件是一个可靠的组件——这里所谓的可靠性是指只要我把消息成功投递给消息中间件,消息就不会丢失,也就是消息至少会保证消息能被消费者接收成功消费一次是消息中间件最基本的特性之一,也就是我们常说的“ATLEASTONCE”,即消息至少会被“成功消费一次”。比如消息M发送给消息中间件,消息传递给消费程序A,A接收到消息,然后消费。但是,程序在消费到一半时重新启动。此时消息没有标记为消费成功,消息会继续传递给消费者,直到消费成功消息中间件才会停止传递。然而,由于这种可靠的特性,消息可能会被多次传递。比如刚才这个例子,程序A收到消息M完成消费逻辑后,想要通知消息中间件“我已经消费成功”时,程序重启,所以对于消息中间件来说,这条消息还没有被成功消费,所以他会继续交付。此时对于应用A来说,貌似消息消费成功了,但是消息中间件还是重复投递。在RockectMQ场景下,这意味着相同messageId的消息会被重复投递。可靠的基于消息的传递(消息不丢失)具有更高的优先级,因此消息不重的任务将转移到应用程序的自我实现,这就是为什么RocketMQ的文档强调消费逻辑需要自我实现实现幂等。其背后的逻辑其实是:不丢失和不重复是矛盾的(在分布式场景下),但是消息重复是有解决办法的,消息丢失很麻烦。简单的消息去重方案举例:假设我们业务的消息消费逻辑是:插入一个订单表的数据,然后更新库存:insertintot_ordervalues.....updatet_invsetcount=count-1其中good_id='good123';为了实现消息的幂等性,我们可能会采用这样的方案:select*fromt_orderwhereorder_no='order123'if(order!=null){return;//消息重复,直接返回}这个对于很多情况下,可以确实有很好的效果,但是在并发的场景下,还是会出现问题。并发和重复消息假设这次消费的所有代码一起耗时1秒,在这1秒内(假设100毫秒)有重复的消息到达(比如producer快重传,Broker重启等),那么就很很可能上面会在重代码中,会发现数据还是空的(因为最后一条消息还没有被消费,订单状态也没有成功更新),那么就会穿透checkbaffle,并且最终重复的消息消费逻辑会进入到非幂等安全的业务代码中,从而导致重复消费的问题(比如主键冲突抛出异常,库存重复扣除不释放等),并发去重的解决方案之一就是解决上述并发场景下的消息幂等性问题,一个比较理想的解决方案是开启事务,将select改为selectforupdate语句来锁定记录。select*fromt_orderwhereorder_no='THIS_ORDER_NO'forupdate//开始交易if(order.status!=null){return;//消息重复,直接返回}但是这个消费的逻辑会导致整个消息消费可能需要更长的时间,并发减少。当然还有其他更高级的解决方案,比如更新订单状态的乐观锁,更新失败消息重新消费等。但是,这需要针对具体的业务场景进行更加复杂细致的代码开发和库表设计,不在本文讨论范围之内。但是无论是selectforupdate还是乐观锁,这些方案实际上都是基于业务表本身去重,这无疑增加了业务开发的复杂度。业务系统中很大一部分请求处理依赖于MQ,如果每个消费逻辑本身都需要基于业务本身进行去重/幂等开发,这是一个繁琐的工作量。本文希望探索一种通用的消息幂等处理方法,从而抽象出一定的工具类,适用于各种业务场景。ExactlyOnce在消息中间件中,有一个传递语义的概念,这个语义有一个概念叫做“ExactlyOnce”,即消息一定会被消费成功,并且只会被消费一次。以下是阿里云对ExactlyOnce的解释:Exactly-Once是指发给消息系统的消息只能被消费者处理,只能处理一次。末端也只消耗一次。在我们业务消息的幂等处理领域,可以认为业务消息的代码肯定会执行,而且只执行一次,所以可以考虑ExactlyOnce。但是在分布式场景下几乎不可能找到通用的解决方案。但是,如果是基于数据库事务的消费逻辑,其实是可行的。基于关系数据库事务插入消息表。假设我们业务的消息消费逻辑是:更新MySQL数据库中一个order表的状态:updatet_ordersetstatus='SUCCESS'whereorder_no='order123';要实现ExacltyOnce,这条消息只被消费一次(而且必须保证消费一次),我们可以这样做:在这个数据库中添加一个消息消费记录表,将消息插入到这个表中,把原来的orderupdate和这个插入动作放到同一个事务中一起Commit可以保证消息只会被消费一次。1.开启事务2.插入消息表(处理主键冲突问题)3.更新订单表(原消费逻辑)4.提交事务说明:1.如果消息消费成功,则提交事务此时,则打开消息表,如果插入成功,即使RocketMQ没有收到消费站点的更新,再次投递,也会插入消息失败,视为消费完毕,然后消费站点将直接更新。这确保我们的消费代码只会被执行一次。2、如果在事务提交前服务挂掉(比如重启),本地事务没有执行,所以没有更新顺序,消息表没有插入成功;对于RocketMQ服务端来说,消费位置没有更新,所以消息还是会继续投递,投递后发现这条消息成功插入到消息表中,所以可以继续消费。这样可以确保消息不会丢失。事实上,阿里云ONS的EXACTLY-ONCE语义的实现与这种基于数据库事务特性的方案类似。更详细的可以参考:https://help.aliyun.com/document_detail/102777.html基于这个方法,确实可以扩展到不同的应用场景,因为它的实现方案与具体业务本身——而是依赖于消息表。但它的局限性如下:1.消息的消费逻辑必须依赖于关系数据库事务。如果消费过程中涉及到其他数据修改,比如Redis这种不支持事务特性的数据源,这些数据是无法回滚的。2、数据库的数据必须在同一个数据库中,不能解决跨库重发。阿里云对消息的去重只是RocketMQ的messageId,在生产者出于某种原因手动重发的场景下(比如上游多次请求事务),无法达到去重/幂等的效果(由于不同的消息编号))。更复杂的业务场景如前所述,通过这种方式实现ExactlyOnce语义其实有很多局限性,使得这种方案基本上不值得广泛应用。并且因为是基于事务的,可能会导致锁表时间过长等性能问题。例如,我们以一个比较常见的订单申请消息为例。可能有以下几个步骤(以下统称为步骤X):1.查库存(RPC)2.锁定库存(RPC)3.开启事务插入订单表(MySQL)4.调用其他一些下游服务(RPC)5.更新订单状态6.提交事务(MySQL)这种情况下,如果我们采用消息表+本地事务的实现方式,消息消费过程中会有很多子流程,不支持回滚,也就是说,即使我们添加了一个事务,它背后的操作也不是原子的。怎么说呢,也就是说,有可能是第一个物品经过第二步锁定库存的时候,服务重启了。这个时候,库存实际上已经锁定在另一个服务中,无法回滚。当然,消息将再次传递。需要保证消息至少可以被消费一次。也就是说,本身锁定库存的RPC接口还是需要支持“幂等”的。再者,如果在这种耗时较长的链场景下加入交易包,会大大降低系统的并发度。所以一般情况下,我们在这种场景下处理消息去重的方式还是采用业务自己实现开头提到的去重逻辑的方式,比如在前面加上selectforupdate,或者使用乐观锁。那么有没有办法让我们提炼出一个能够兼顾去重、通用、高性能的公共方案呢?拆解消息执行过程中的一个思路是将上述步骤拆解成若干个不同的子消息,例如:1.库存系统消费A:查询库存并锁定库存,发送消息B给订单服务2.订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费3、下游系统消费消息C:处理一些逻辑,发送消息D给订单系统4、订单系统消费消息D:更新订单状态注意:以上步骤需要保证本地事务和消息是一个事务(至少最终一致),涉及到分布式事务消息相关的话题,本文不做讨论。可以看出,这样的处理方式会让操作的每一步都更加原子化,而原子就是小事务,小事务意味着使用消息表+事务的方案是可行的。然而,这太复杂了!这就把一个原本连续的代码逻辑拆分成了多个系统的多个消息交互!这不如在业务代码级别锁定。更通用的解决方案。上面的消息表+本地事务方案有其局限性和并发性的短板。根本原因在于它依赖于关系数据库的事务,事务必须包裹在整个消息消费中。链接。如果我们能够不依赖事务来对消息进行去重,那么这个解决方案可以扩展到更复杂的场景,比如RPC、跨库等。比如我们仍然使用消息表,但是不依赖事务,而是增加消费状态到消息表。这能解决问题吗?基于消息幂等表的非事务化方案67\_1.png以上就是去事务化消息幂等方案的流程。可以看出,这个方案没有事务,只是区分了消息表本身的状态。:消费,消费完成。只有消费过的消息才会被幂等处理。对于已有消费中的消息,后面重复的消息会触发延迟消费(发送到RocketMQ场景中的RETRYTOPIC)。之所以触发延迟消费,是为了控制并发场景。第二条消息在第一条消息未完成时,控制消息不丢失(如果直接幂等,则消息(消息id相同)会丢失,因为如果最后一条消息没有被消费,已经告诉broker第二条消息成功了如果此时第一条消息失败,broker不会重新投递)以上过程就不细说了,后面会给出github源码地址.读者可以参考源码的实现。这里我们回头看看一开始的想法问题是否解决:1.消息消费成功,直接幂等处理第二条消息(消费成功)。2.并发场景下的消息仍然可以满足无消息重复的问题,即穿透幂等挡板的问题。3.支持上游业务生产者反送重复业务消息的幂等问题。第一个问题显然已经解决了,这里就不展开讨论了。如何解决第二个问题?主要是通过插入消息表的动作来控制的。假设我们使用MySQL作为消息表的存储介质(设置消息的唯一ID作为主键),那么只有一条消息会插入成功,后续的消息插入都是由于主键。因为冲突而失败,导致分支延迟消费,然后后面的延迟消费就会变成上面第一个场景的问题。关于第三个问题,我们只要设计去重的messagekey支持业务的主键(比如订单号,请求序列号等),而不仅仅是messageId。所以这不是问题。这种情况下是否存在消息丢失的风险?细心的读者可能会发现,这里其实存在一个逻辑漏洞。问题出在上面讨论的三个问题中的第二个问题(并发场景)。在并发场景下,我们依靠消息状态来做并发控制。使第2条消息重复的消息将不断延迟消费(重试)。但是如果此时由于某些异常原因(比如机器重启、外部异常导致消费失败)导致第一条消息没有消费成功怎么办?也就是说,此时延迟消费其实每次下来都看到_consuming_的状态,最终消费会被视为消费失败,投递到死信Topic(RocketMQ默认可以消费16次)。你有这个顾虑是对的!对此,我们的解决方案是,插入的消息表必须有一个最大消费过期时间,比如10分钟,也就是说如果一条消息在消费超过10分钟,就需要从消息表中删除(需要程序本身)。所以最后一条消息的处理过程会是这样的:67\_2.png更灵活的消息表存储介质我们的方案其实是没有事务的,只需要一个中心存储介质,自然可以选择更灵活的存储介质,比如作为Redis。使用Redis有两个好处:1.性能损失更低。2、我们上面说的超时,可以直接使用Redis自带的ttl来实现。自己选择。源码:RocketMQDedupListener以上解决方案已经开源了RocketMQ的Java实现,放在了Github中。具体的使用文档可以参考https://github.com/Jaskey/Roc...,下面只是ReadmeExample中使用Redis去重的帖子,展示一下使用这个工具是多么的简单messages去重业务中的幂等性://使用Redis作为幂等表);//大多数情况下可以直接使用消费组名StringRedisTemplatestringRedisTemplate=null;//这里省略了获取StringRedisTemplate的过程);consumer.registerMessageListener(messageListener);consumer.start();以上代码大部分是原版RocketMQ的必备代码,唯一需要修改的是创建一个DedupConcurrentListener实例,在这个实例中指定你的消费逻辑和去重业务key(默认为messageId)。更多使用细节请参考Github上的说明。这个实现是一劳永逸的吗?至此看来,解决方案还是比较完美的。所有消息都可以快速访问和去重,与具体的业务实现完全解耦。那么这样是不是就完美的完成了去重的所有任务呢?不幸的是,事实并非如此。原因很简单:因为至少要保证消息消费成功一次,那么有可能消息消费到一半时消息失败触发消息重试。还是用上面的订单流程X:1.查库存(RPC)2.锁库存(RPC)3.开启一个事务插入订单表(MySQL)4.调用其他一些下游服务(RPC)5.更新订单status6,committransaction(MySQL)当消息被消费到第3步时,我们假设是MySQL异常导致失败,并触发消息重试。因为我们在重试之前会删除幂等表的记录,所以在重试消息时会重新输入消费代码,然后再次执行步骤1和步骤2。如果步骤2本身不是幂等的,那么业务消息消费仍然不是完全幂等的。这个实现的价值?那么既然这样并没有完全完成消息的幂等性,那么它的价值是什么呢?物超所值!虽然这不是解决消息幂等性的灵丹妙药(其实在软件工程领域基本上没有灵丹妙药),但是可以很方便的解决:1.broker引起的各种消息重投,负载均衡等重复问题2.各个上游生产者造成的业务级消息重复。3、并发消费重复消息的控制窗口问题。即使重复,也不可能同时进入消费逻辑。消息去重的另外一些建议是,使用这种方式可以保证在正常的消费逻辑场景下(无异常,无异常退出),消息的所有幂等工作都可以解决,无论是业务重复还是rocketmq特性造成的重复。其实这样已经可以解决99%的消息重复问题了,毕竟异常场景肯定是有的。那么,如果想在异常场景下很好地处理幂等问题,可以通过以下方式来降低问题率:1.消息消费失败时做回滚。如果消息消费失败本身有回滚机制,那么消息重试自然没有副作用。2、消费者要做好优雅退出处理。这是为了尽可能避免程序退出一半消息消耗导致的消息重试。3、一些不能幂等的操作,至少要终止消费并报警。比如锁存货的操作,如果统一业务流锁存货一次成功,再触发锁存货,如果不能做到幂等处理,至少要在消息消费上触发异常(比如消费异常导致通过主键冲突等)4.在#3的前提下,做好消息消费监控,当发现消息重试不断失败时,手动回滚#1,使下次重试消费成功。