灵魂拷问为什么MQ消息的消费有时需要幂等?你们都说版本号可以用来解决幂等消费?消息幂等消费的根本问题是什么?随着系统复杂度的不断增加,大部分系统都会引入MQ来进行解耦。其实从引入MQ的初衷来看,大部分系统都是为了解耦多模块带来的复杂度,但是有“架构师”说:为了解决性能问题。..当然,也不排除MQ有流量削峰的功能。我只是说大多数系统引入MQ的初衷应该是系统解耦。当一个大的单体系统逐渐拆分成多个小系统,也就是所谓的微服务拆分之后,几乎所有微服务之间的通信和分布式事务都需要MQ的支持。充分体现了MQ在分布式系统中的重要性。此时整个系统之间的交互类似于下图所示的图像制作消息。既然引入了MQ组件,那肯定意味着同时有消息的生产者和消费者,这也是典型的订阅模式。在消息数据的整个生命周期中,会依次经历producer=”MQ=”consumer这三个主要部分。从生产者的角度来看,消息的可靠传递是首要任务。由于网络的不可靠性,理论上不可能100%成功传递消息。对于这种情况,一般的解决方法是重传消息。当然,重传机制不是无限重传。可以根据业务制定具体的重传策略。例如可以设置最大重传次数为10次,重传间隔依次递增。这种方案虽然简单,但是它的副作用就是消息重复投递的问题。为什么需要幂等性消费幂等性是一个数学概念理论,意思是对同一个操作执行多次,同一个操作执行一次,最终得到的结果是一样的。给你举个不当正经却很准确的栗子:你女朋友出轨过一次,出轨过多次。对于你来说,结果其实是一样的:你被骗了。所以出轨一次和出轨多次的结果对你来说是一样的。对于MQ,退一步说,即使不存在MQ消息重复投递的问题,但是在消费者端的业务中,那些对消息消费比较敏感的业务,我们在设计程序的时候也要考虑消息的幂等性建筑学。考虑消费,例如:用户购买商品,赠送红包或积分的业务场景。这样的场景对消息的重复消费非常敏感。如果程序处理不当,就会出现反复给用户发红包的情况,估计程序员也得背锅来祭天了。幂等性其实很好。任何一个业务场景接口的幂等设计,都需要找出幂等性产生的数据标识。MQ消息的重复问题,从整个消息流的角度来看,可以从两个方向解决:避免在消息产生时传递重复的消息,即:消息生产者保证消息的唯一性MQ本身提供了重复的消息过滤function消息被消费时,避免重复消费image在消息被消费前的前半部分流程中,生产者可以使用唯一的消息id和ACK机制来保证消息被重复投递,但是这样会大大降低生产者业务的表现。通常,生产者需要异步发送MQ消息。如果发送的时候需要检查消息是否已经发送,这无疑不是一个好的设计,而且你检查的效果,只是命中了很小一部分数据,得不偿失,所以很少有人在生产者主动检查消息的重复投递。至于MQ内部,有些MQ确实提供了幂等存储设计,比如Kafka引入了ProducerID(即PID)和SequenceNumber。PID。每个新的Producer在初始化时都会被分配一个唯一的PID,这个PID对用户是不可见的。序列号。(对于每个PID,Producer发送的每条数据都对应一个从0开始单调递增的SequenceNumber,Broker端将这个seqnumber保存在缓存中。对于接收到的每条消息,如果它的sequencenumber比in中的sequencenumber高Broker缓存如果大于1则接受,否则丢弃。这样可以实现消息的重复提交。但是只能保证单个Producer是同一个ExactlyOnce语义。同一个Producer不能保证一个topic的不同部分是幂等的,但是这些都不是我们今天要讲的重点,在实际业务中,消息的幂等消费更倾向于在消费端完成,而问题在消息结束时就彻底解决了,不管是系统设计还是扩展性,无疑都是最好的,刚才说了,既然消息要幂等消费,jud的一个标识必须提供gingduplication,可以是自定义的消息ID,也可以是消息中几个字段的组合,看起来类似于数据表中的主键。目前主流的方式是在生产端根据业务特性生成消息id。比如你因为下单添加给用户积分的messageid,你可以根据userid_orderId_numberofpointsmessageid生成一个唯一的。有了唯一的消息id,消费者可以将消费过的消息id存储在本地,过滤重复的消息。当然,如果数据量比较大,可以删除很久以前的历史数据,或者转移到其他备份中,毕竟同样的消息时间久了是无法再次投递的。下面是一个本地消息表的例子:字段说明MsgId消息idCreateTime创建时间...其他有用的业务字段消费新消息时,执行如下类似于下面的sql语句,得到消息是否已经被创建的结果consumed判断当前消息是否需要重复消费selectcount(0)fromtablewhereMsgId='messageid'当然这里会有问题,如果只有一个consumer消费就没有问题,如果有多个消费者在并行消费,在判断重复消息的时候会需要锁来保证相同数据的顺序。这时候你可能需要分布式锁。郑重提示一下,除了生成messageid的方法,网上还有很多文章可以使用版本号来解决幂等性问题。试问:有多少人亲身实践过这个解法?今天我们给用户加分。我们来看看这个解决方案的方法:用户的评分表需要增加一个版本号(Version)字段。消息的生产者在消息传递中添加版本号字段。消费者根据消息的版本号执行sql。具体sql类似:updateusersetamount=amount+10,version=version+1whereuserid=100andversion=1对于同一条消息的重复投递,这样确实可以做到幂等消费。毕竟程序使用了数据库的锁机制来保证一致性。所以有什么问题?消息版本号问题所有的分布式系统都面临着同一个问题,那就是数据的一致性问题,MQ的消费场景也不例外。上面以用户加分为例,因为消息的生产者在传递消息时需要查询当前版本号,类似下面的sqlselectversionfromtablewhereuserid=100查询版本号信息时,会将版本号作为部分传递消息体的MQ,在并发的情况下会怎样?假设当前版本号为1:线程A查询版本号为1,然后下发版本号为1、消息id为x的消息。同时,线程B也查询了当前用户的版本,值也是1,然后投递了一条消息id为Y的消息。此时无论消费者是先消费消息X还是先消费消息Y,数据库的版本号会增加,这会导致另一个消息Consumptionfailedduetomismatchedversionnumbers。image本文转载自微信公众号《建筑师实践之路》,可通过以下二维码关注。转载本文请联系架构师修炼之路公众号。
