当前位置: 首页 > 科技观察

消息幂等(去重)通用解决方案,写的好!

时间:2023-03-16 20:57:14 科技观察

消息中间件是分布式系统中常用的组件。无论是异步、去耦,还是调峰,都具有广泛的应用价值。我们通常认为消息中间件是一个可靠的组件——这里所谓的可靠性是指只要我把消息成功投递给消息中间件,消息就不会丢失,也就是消息至少会保证消息能被消费者接收成功消费一次是消息中间件最基本的特性之一,也就是我们常说的“ATLEASTONCE”,即消息至少会被“成功消费一次”。比如消息M发送给消息中间件,消息传递给消费程序A,A接收到消息,然后消费。但是,程序在消费到一半时重新启动。此时消息没有标记为消费成功,消息会继续传递给消费者,直到消费成功消息中间件才会停止传递。然而,由于这种可靠的特性,消息可能会被多次传递。比如刚才这个例子,程序A收到消息M完成消费逻辑后,想要通知消息中间件“我已经消费成功”时,程序重启,所以对于消息中间件来说,这条消息还没有被成功消费,所以他会继续交付。此时对于应用A来说,貌似消息消费成功了,但是消息中间件还是重复投递。在RockectMQ场景下,这意味着相同messageId的消息会被重复投递。可靠的基于消息的传递(消息不丢失)具有更高的优先级,因此消息不重的任务将转移到应用程序的自我实现,这就是为什么RocketMQ的文档强调消费逻辑需要自我实现实现幂等。其背后的逻辑其实是:不丢失和不重复是矛盾的(在分布式场景下),但是消息重复是有解决办法的,消息丢失很麻烦。简单的消息去重方案举例:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:insertintot_ordervalues.....updatet_invsetcountcount=count-1wheregood_id='good123';要实现消息的幂等性,我们可能会采用这样的方案:select*fromt_orderwhereorder_no='order123'if(order!=null){return;//重复的消息,直接返回}这样在很多情况下确实可以起到很好的效果,但是在并发场景下,还是会出问题。并发和重复消息假设这次消费的所有代码一起耗时1秒,在这1秒内(假设100毫秒)有重复的消息到达(比如producer快重传,Broker重启等),那么就很很可能上面会在重代码中,会发现数据还是空的(因为最后一条消息还没有被消费,订单状态也没有成功更新),那么就会穿透checkbaffle,并且最终重复的消息消费逻辑会进入到非幂等安全的业务代码中,从而导致重复消费的问题(比如主键冲突抛出异常,库存重复扣除不释放等),并发去重的解决方案之一就是解决上述并发场景下的消息幂等性问题,一个比较理想的解决方案是开启事务,将select改为s选择更新语句来锁定记录。select*fromt_orderwhereorder_no='THIS_ORDER_NO'forupdate//开启事务if(order.status!=null){return;//消息重复,直接返回}但是这次消费的逻辑会导致整个消息消费发生变化,因为到引入事务包Long,并发度下降。当然还有其他更高级的解决方案,比如更新订单状态的乐观锁,更新失败消息重新消费等。但是,这需要针对具体的业务场景进行更加复杂细致的代码开发和库表设计,不在本文讨论范围之内。但是无论是selectforupdate还是乐观锁,这些方案实际上都是基于业务表本身去重,这无疑增加了业务开发的复杂度。业务系统中很大一部分请求处理依赖于MQ,如果每个消费逻辑本身都需要基于业务本身进行去重/幂等开发,这是一个繁琐的工作量。本文希望探索一种通用的消息幂等处理方法,从而抽象出一定的工具类,适用于各种业务场景。ExactlyOnce在消息中间件中,有一个传递语义的概念,这个语义有一个概念叫做“ExactlyOnce”,即消息一定会被消费成功,并且只会被消费一次。以下是阿里云对ExactlyOnce的解释:Exactly-Once是指发给消息系统的消息只能被消费者处理,只能处理一次。末端也只消耗一次。在我们业务消息的幂等处理领域,可以认为业务消息的代码肯定会执行,而且只执行一次,所以可以考虑ExactlyOnce。但是在分布式场景下几乎不可能找到通用的解决方案。但是,如果是基于数据库事务的消费逻辑,其实是可行的。基于关系数据库事务插入消息表假设我们业务的消息消费逻辑是:更新MySQL数据库中一个订单表的状态:update_ordersetstatus='SUCCESS'whereorder_no='order123';要实现ExacltyOnce,这条消息只被消费一次(而且必须保证消费一次),我们可以这样做:在这个数据库中添加一个消息消费记录表,将消息插入到这个表中,把原来的顺序update和这个插入动作放到同一个事务中提交,保证消息只会被消费一次。开启事务插入消息表(处理主键冲突问题)更新订单表(原消费逻辑)提交事务说明:1.此时如果消息消费成功,事务提交,则消息表插入成功。此时,RocketMQ如果没有收到消费站点的更新,重新投递,消息也会插入失败,视为消费完毕,之后直接更新消费站点。这确保我们的消费代码只会被执行一次。2、如果在事务提交前服务挂掉(比如重启),本地事务没有执行,所以没有更新顺序,消息表没有插入成功;对于RocketMQ服务端来说,消费位置没有更新,所以消息还是会继续投递,投递后发现这条消息成功插入到消息表中,所以可以继续消费。这样可以确保消息不会丢失。事实上,阿里云ONS的EXACTLY-ONCE语义的实现与这种基于数据库事务特性的方案类似。基于这种方式,它确实具有扩展到不同应用场景的能力,因为它的实现与具体业务本身无关——它依赖于一个消息表。但这里有局限性。消息的消费逻辑必须依赖于关系数据库事务。如果消费过程中涉及到其他数据修改,比如Redis这种不支持事务特性的数据源,这些数据是无法回滚的。数据库的数据必须在一个数据库中,不能跨数据库解决。阿里云对消息的去重只是RocketMQ的messageId,在生产者出于某种原因手动重发的场景下(比如上游多次请求事务),无法达到去重/幂等的效果(由于不同的消息编号))。更复杂的业务场景如前所述,通过这种方式实现ExactlyOnce语义其实有很多局限性,使得这种方案基本上不值得广泛应用。并且因为是基于事务的,可能会导致锁表时间过长等性能问题。例如,我们以一个常见的订单申请消息为例。可能有以下几个步骤(以下统称为步骤X):检查库存(RPC)锁定库存(RPC)打开一个事务并插入到订单表(MySQL)调用一些其他下游服务(RPC)更新订单状态Committransaction(MySQL)这种情况下,如果我们采用消息表+本地事务的实现方式,消息消费过程中的很多子流程是不支持回滚的,也就是说,即使我们加上了事务,在事实上,这背后的操作不是原子的。怎么说呢,也就是说,有可能是第一个物品经过第二步锁定库存的时候,服务重启了。这个时候,库存实际上已经锁定在另一个服务中,无法回滚。当然,消息将再次传递。需要保证消息至少可以被消费一次。也就是说,本身锁定库存的RPC接口还是需要支持“幂等”的。再者,如果在这种耗时较长的链场景下加入交易包,会大大降低系统的并发度。所以一般情况下,我们在这种场景下处理消息去重的方式还是采用业务自己实现开头提到的去重逻辑的方式,比如在前面加上selectforupdate,或者使用乐观锁。那么有没有办法让我们提炼出一个能够兼顾去重、通用、高性能的公共方案呢?Java核心技术教程及示例源码:https://github.com/javastacks/javastack消息执行过程的一种拆解方式是将上述步骤拆解成若干不同的子消息,例如:InventorysystemconsumptionA:Check库存并锁定库存,发送消息B给订单服务订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费下游系统消费消息C:处理部分逻辑,发送消息D向订单系统消费消息订单系统D:更新订单状态注:以上步骤需要保证本地事务和消息是一个事务(至少是最终一致性),其中涉及主题相关到分布式事务消息,本文不做讨论。可以看出,这样的处理方式会让操作的每一步都更加原子化,而原子就是小事务,小事务意味着使用消息表+事务的方案是可行的。分享给大家:SpringBoot学习笔记,这一篇太全面了!然而,这太复杂了!这就把一个原本连续的代码逻辑拆分成了多个系统的多个消息交互!这不如在业务代码级别锁定。另外,多线程系列的面试题和答案都整理好了。微信搜索Java技术栈,后台发送:面试,网上可以看。更通用的解决方案。上面的消息表+本地事务方案有其局限性和并发性的短板。根本原因在于它依赖于关系数据库的事务,事务必须包裹在整个消息消费中。关联。如果我们能够不依赖事务来对消息进行去重,那么这个解决方案可以扩展到更复杂的场景,比如RPC、跨库等。比如我们仍然使用消息表,但是不依赖事务,而是增加消费状态到消息表。这能解决问题吗?以上基于消息幂等表的非事务化方案就是去事务化消息幂等方案的流程。可以看出这个方案是无事务的,只是区分了消息表本身的状态:consuming,consumingFinish。只有消费过的消息才会被幂等处理。对于已有消费中的消息,后面重复的消息会触发延迟消费(发送到RocketMQ场景中的RETRYTOPIC)。之所以触发延迟消费,是为了控制并发场景。第二条消息在第一条消息未完成时,控制消息不丢失(如果直接幂等,则消息(消息id相同)会丢失,因为如果最后一条消息没有被消费,已经告诉broker第二条消息成功了如果此时第一条消息失败,broker不会重新投递)以上过程就不细说了,后面会给出github源码地址.读者可以参考源码的实现。这里我们回头看看一开始的想法,是否解决了问题:消息已经消费成功,直接幂等处理第二条消息(消费成功)。并发场景下的消息仍然可以满足不重复消息的问题,即穿透幂等挡板。支持上游业务生产者反送重复业务消息的幂等问题。第一个问题显然已经解决了,这里就不展开讨论了。如何解决第二个问题?主要是通过插入消息表的动作来控制的。假设我们使用MySQL作为消息表的存储介质(设置消息的唯一ID作为主键),那么只有一条消息会插入成功,后续的消息插入都是由于主键。因为冲突而失败,导致分支延迟消费,然后后面的延迟消费就会变成上面第一个场景的问题。关于第三个问题,我们只要设计去重的messagekey支持业务的主键(比如订单号,请求序列号等),而不仅仅是messageId。所以这不是问题。另外,MySQL系列面试题和答案都整理好了。微信搜索Java技术栈,后台发送:面试,网上可以看。这种情况下是否存在消息丢失的风险?细心的读者可能会发现,这里其实存在一个逻辑漏洞。问题出在上面讨论的三个问题中的第二个问题(并发场景)。在并发场景下,我们依靠消息状态来做并发控制。使第2条消息重复的消息将不断延迟消费(重试)。但是如果此时由于某些异常原因(比如机器重启、外部异常导致消费失败)导致第一条消息没有消费成功怎么办?也就是说,此时延迟消费其实每次下来都看到消费的状态,最终消费会被视为消费失败,投递到死信topic(RocketMQ默认可以消费16次).你有这个顾虑是对的!对此,我们的解决方案是,插入的消息表必须有一个最大消费过期时间,比如10分钟,也就是说如果一条消息在消费超过10分钟,就需要从消息表中删除(需要程序本身完成)。所以最后一条消息的流程会是这样的:一个更灵活的消息表存储介质我们的方案其实是没有事务的,只需要一个中心存储介质,自然可以选择更灵活的存储介质,比如Redis。另外,Redis系列面试题和答案都整理好了。微信搜索Java技术栈,后台发送:面试,网上可以看。使用Redis有两个优点:较低的性能损失。上面我们说的超时可以直接使用Redis本身的ttl来实现。当然Redis存储的数据可靠性和一致性不如MySQL,需要用户自行选择。