当前位置: 首页 > 科技观察

来自灵魂的提问:重复消费才能消费分布式事务

时间:2023-03-18 01:08:44 科技观察

大家好。我是你学习成长的伙伴船长。我们继续学习RocketMQ。在上一篇文章中,我们了解了广播消息、延迟消息、批量消息和过滤消息。这些都是在RocketMQ的这篇文章中,我们将继续学习RocketMQ中的奇葩特性,让大家在开发中乐在其中。在这篇文章中,我们将讨论重复消费和顺序消费这两个消息队列中常见的问题,还有一种事务消息,这种事务消息可以在消息队列中完成分布式事务的特性。将之前与这些技术点相关的文章贴在这里。可以先阅读了解什么是RocketMQ。我不知道为什么。RocketMQ生产者有这么多用途?(图在最后,不用谢)面试官问我:什么是分布式事务?像这样,应该是面试领域非Ban必选的技术点,除非面试官忘记了,否则大概率会问到这些问题相关的技术栈,到时候充分发挥大家的技术知识和技术深度。这种问题应该是不可避免存在的,也是大家在使用消费队列时必须要考虑的问题之一。反正我是先用消息队列来考虑这个问题的,因为如果不考虑这个问题,可能会造成无法接受的业务问题。重复消费的问题,大家一定要明白什么意思,就是同一条消息被消费了很多次。为什么说这种问题一定存在呢,因为消息队列肯定有它的重试机制,也就是消息重发。一旦消费端出现异常在某些情况下,消息队列会重新发送消息。可以重发消息,重新处理,但是一般一条消息的监听器不止一个,也就是可能有多个系统在监听和处理这条消息。如果其他系统不支持重复消费岂不是很糟糕?其他系统中的数据会混乱,系统之间的数据会不一致。比如电商系统中的支付成功消息,支付成功后发送一条消息,积分系统和物流系统中的多个系统监听到这条消息,积分系统处理异常,出现一条关于支付成功被拒。如果物流系统不支持消息重试,出现了两个物流订单,那么可能是客户购买了一件商品,支付了一件商品,最后给用户发送了多份该商品。啊……那会不会很糟糕?不妨收拾一下其实消息重试真的是一个很常见的情况,也是大家在使用消息队列时必须要考虑的问题,比如网络抖动,系统业务处理bug等,如果不解决这个问题,系统就会后患无穷。如何避免这种双重消费问题?解决方案:幂等幂等性是一个数学概念。通俗的解释就是同一个参数多次调用同一个接口,调用的结果都是一样的,就是你发送了多少次支付成功的消息,最后生成的物流数据还是这样的没问题,那么如何保证幂等性呢?这类问题我一般分两种场景来回答,一种是生产端幂等,一种是消费端幂等。它是通过第三方存储完成的,比如Redis,或者流量计。消息发送后,暂时保存记录。在下次发送消息之前,先在Redis中检查消息是否已经发送,但这在很多场景中是不合适的。这会限制生产端的重试机制。如果生产端发送成功,但消费失败,则不会重发消息。消费端的另一种幂等性属于最常见的,无论生产者发送多少次相同的消息,最终的执行结果都是一样的。可以分为强幂等和弱幂等来处理。强幂等其实是用在必须幂等的业务场景中。允许出错,这个比较谨慎,比如上面支付成功的消息,物流消费者的处理一定要强幂等这里可以引入三方存储,可以是流量计,也可以是Redis,支付完成后successful,记录在流表中,这里使用Redis可能会丢失,将支付成功和记录到流表中放到同一个事务中,要么一起成功,要么一起失败,每条消息过来后,去流表查看根据订单号是否有这个stream,有stream直接返回即可。也可以直接使用数据库的唯一约束做插入操作,属于弱幂等性,不能保证100%的幂等性,比如用Redis存储业务ID作为唯一键。Redis宕机可能会导致短信发送丢失,但问题不大,用户可以接受。我们来看示例代码StringidempotentValue=RedisUtil.get(RedisConstant.IDEMPOTENT.concat(msgId),String.class);if(!StringUtils.isEmpty(idempotentValue)){log.info("========消息已被消费:【{}】",msgBody);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//业务代码//幂等处理RedisUtil.setEx(RedisConstant.IDEMPOTENT.concat(msgId),"1",5,TimeUnit.DAYS);02顺序消费那告诉我,你遇到过顺序消费的场景吗?顺序消费的场景其实不是特别常见,但是也是必不可少的,因为在一些业务场景中,顺序对于保证消息的消费顺序是非常重要的。比如我们有一个操作需要对数据进行增删改查。在这个一般的系统中,我们会使用SQL来操作,但是当数据量非常大的时候,我们在做备份和同步数据的时候,有时候这种同步会通过消息队列慢慢的执行。这时候就需要保证消息的顺序。如果把上面三个操作变成了修改、删除、添加的顺序的话,就不是我们想要的效果了。当然,普通消息的消费没有固定的顺序。发送消息时,默认通过轮询的方式发送到不同的分区。消费者消费时,会分配到多个分区。同时拉取多个分区并提交消费。在同一个partition的队列中,可以保证FIFO,但是无法做到普通消息的顺序。对于消费,只需要将消息投递到同一个队列即可。根据上面,我们只需要保证将需要保持有序的消息投递到同一个队列中,这样同一个队列中的消息肯定会投递到同一个消费者实例中,同一个消费者实例必须拉取顺序消息,然后顺序消费,即使触发重排后队列分配给其他消费者也无所谓,因为队列的消息永远是FIFO,所以只需要保证重复的幂等性即可消息的消费。队列内部顺序还是没问题的。全局的消费分配顺序和分区顺序是全局的:对于一个指定的topic,所有消息都遵循严格的先进先出FIFO(FirstInFirstOut)顺序发布和消费分区顺序:对于一个指定的Topic,所有的消息根据ShardingKey进行分区。同一分区中的消息按照严格的FIFO顺序发布和使用。ShardingKey是顺序消息中用来区分不同分区的key字段,与普通消息的Key是完全不同的概念。为什么globalsoul虚拟时序消息的消费性能一般globalsequentialmessage都是严格按照FIFO的消息阻塞原则,即如果上一条消息没有消费成功,那么下一条消息会一直存储在topic中队列。如果想提高全局时序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少本地业务逻辑的耗时处理。在rocketmq中,一个topic下有多个queue,所以为了保证消息的顺序,消息都发送到同一个queue。Rocketmq提供了MessageQueueSelector队列选择机制。使用Hash取模的方法有3种实现方式让需要顺序消费的消息发送到同一个队列,然后同步发送。当然,这个取模是基于这些消息的共同属性。Rocketmq只保证发送的顺序性。至于最后的顺序消费,还是要消费者业务来保证,我保证我发给你的消息是有序的,但是如果你自己处理的乱七八糟的话,rocketmq不关我的事,是你的事自己的代码问题。其实还是有一些不正常的场景。导致乱序的情况,比如master宕机,导致写队列数量发生变化。你想,如果使用上面的哈希取模,消息会被分散到其他队列中,这样顺序就无法保证了,除非选主,否则挂了就无法发送下一条消息。成功,或者全部执行失败;而分布式事务是跨机器、跨服务、跨系统的事务保证。目前的系统拆分成很多服务,每个服务至少部署两个,分别部署在不同的机器上这样的系统之间的事务保证是分布式事务,而rocketmq中的事务消息天然支持分布式事务事务消息:实现类似X或OpenXA的分布式事务功能实现消息队列的最终一致性RocketMQ版提供了类似X或OpenXA的分布式事务功能,可以通过消息队列RocketMQ版的事务消息实现分布式事务的最终一致性。半事务消息:暂时无法传递的消息。发送方已经成功将消息发送到消息队列的RocketMQ服务器,但是服务器还没有收到生产者对消息的二次确认。此时消息被标记为“暂时无法投递”,该状态下的消息为半事务消息。消息回溯:由于网络断开、生产者应用重启等原因,某笔交易消息的二次确认丢失。当消息队列RocketMQ版本服务器通过扫描发现某条消息长期处于“半事务消息”中时,需要主动向消息生产者查询该消息的最终状态(Commit或回滚),查询过程为消息回滚。和Captain一起看看交易消息发送步骤:1、发送方发送半交易消息给服务端Broker,服务端将消息持久化。成功后会返回ACK确认消息发送成功。此时消息为Semi-transactionalmessage2.sender开始执行本地事务的逻辑3.sender会根据本地事务的执行结果向server提交二次确认,决定是否Commit或回滚。服务端收到Commit后,将消息标记为可用Delivery,发送给消费者;服务器收到Rollback后会删除半事务消息,服务器不会发送,消费者也不会收到。但是,如果网络断开或者应用重启,上面的步骤会重复两次,确认信息无法到达服务器怎么办?这里其实有一个checkback机制。发送方发送消息后,需要在本地执行交易。如果交易执行过程卡住,或者交易执行结果无法将交易结果传递给服务端,服务端会执行checkback机制来确认半交易消息的最终提交