当前位置: 首页 > 科技观察

使用消息中间件时,如何保证消息只被消费一次?

时间:2023-03-16 12:31:18 科技观察

消息中间件应用广泛,常用于削峰填谷、系统解耦、异步处理等。异步处理可能是使用最多的场景。比如现在的技术博客站点都是采用积分制。用户发表文章后,可以获得想要的积分。为了提高系统的性能,给用户加分的操作可以异步处理,不需要放在同步进程中。我们可以将用户ID和需要加点的积分打包成一条消息下发给消息系统,异步处理加点操作。由于这种情况发生在不同的服务器之间,消息可能无法传递或处理,这可能会导致用户加分失败,还有一种可能是消息被重复发送,因此用户可能会重复加分。不管发生什么,都是不正常的。为了避免以上两种情况,我们需要尽量保证消息不丢失,消息只被消费一次。本文抛开具体的消息中间件不谈,从消息系统的通用层面来谈谈如何避免这两种情况。.1.确保消息不丢失。在一条消息从生产到消费的环节中,有3个地方可能会导致消息丢失,分别是:生产者向消息队列写入消息的过程中投递失败。消息在消息队列中,持久化失败。消费者消费消息的过程中出现异常。在消息生产过程中,投递失败的消息生产者和消息系统一般独立部署在不同的服务器上。两台服务器之间的通信必须通过网络来完成。网络不稳定,可能会出现抖动。然后数据可能会丢失。网络抖动有两种情况。消息生产过程中消息丢失场景一:消息传输到消息系统时出现网络抖动,数据直接丢失。场景二:消息已经到达消息系统,但是当消息系统向生产者服务器返回信息时,网络出现波动。这时候数据可能并没有真正丢失,很有可能是生产者认为数据丢失了。对于消息生产过程中丢失的消息,可以采用重新投递机制。当程序检测到网络异常时,将消息重新投递到消息系统。但是在场景2中,重新投递可能会造成数据重复。后面会提到如何解决这个问题。消息队列持久化失败消息系统无法持久化消息。消息一般存储在本地磁盘中。当然也有少数消息中间件支持数据持久化到数据库,所以消息系统的性能可能会受到限制。会摔倒。如果你对Redis的持久化有一定的了解,你会发现,Redis在持久化数据的时候,并不是每次有新的item加入就立即存储到本地磁盘中,而是将数据写入到操作的PageCache中系统第一。在此过程中,当满足一定条件时,PageCache中的数据会被刷新到磁盘中,因为这样可以减少对磁盘的随机I/O操作。我们知道随机I/O是非常耗时的,这也提高了系统性能,消息中间件也不例外,持久化也是用的这种方式。在某些极端情况下,PageCache中的数据可能会丢失,例如突然断电或机器异常重启。解决PageCache数据丢失的问题,可以采用集群部署的方式,尽可能保证数据不丢失。消费过程中存在消息丢失。消息在消费过程中也会丢失,消费过程中丢失的概率比前两种情况大很多。一个消息消费流程大致分为三步:消费者拉取消息,消费者处理消息,消息系统更新消费进度。图片说明第一步拉取消息可能会出现网络抖动异常,第二步处理消息可能会出现一些业务异常,导致流程无法完成。如果第一步和第二步出现异常通知消息系统更新消费进度的情况,那么这条失败的消息永远不会被处理,自然会丢失。事实上,我们的业务还没有跑完。为避免消费过程中消息丢失,可以在消息接收处理完成后更新消费进度。但是在极端情况下,会出现消息重复消费的问题。比如某个消息处理完后,消费者崩溃了。机器宕机了,此时消费进度还没有更新。消费者重启后,这条消息仍会被消费。2.保证消息只被消费一次。消息系统本身不能保证消息只被消费一次,因为消费本身可能会重复,下游系统开始重复拉取,重试失败导致的重复,补偿逻辑导致的重复可能导致重复消息,到确保消息只被消费一次可以通过使用幂等性来实现。幂等性是数学中的一个概念,即多次执行同一个操作,执行一次操作,最后的结果是一样的。从幂等性的概念可以看出,即使消息被多次执行,也不会对系统造成影响,那么在使用消息系统时如何保证幂等性呢?因为生产者和消费者都可能产生重复的消息,所以必须保证生产者和消费者两端的幂等性。为保证生产者的幂等性,在生产消息时,使用雪花算法为消息生成全局ID,并在消息系统中维护消息ID映射关系。如果映射表中已经存在相同的ID,则丢弃该消息。虽然消息发送了两次,但实际上只保存了一条消息,避免了消息重复的问题。生产者的幂等性与被选中者的消息中间件有关,因为大多数情况下消息系统不需要我们自己实现,所以幂等性不好控制,而消费者的幂等性就是我们制定人事管控的重点方向。在消费者端,可以从通用层和业务层进行幂等操作,这取决于我们的业务需求。一般层面,使用goodmessagegeneration生成的全局唯一ID。消息被成功处理后,这个全局ID被存储在数据中。在处理下一条消息之前,先从数据库查询全局ID是否存在。如果已经存在,则直接丢弃该消息。利用这个全局唯一的ID实现消息的幂等性,伪代码如下:booleanisIDExisted=selectByID(ID);//判断ID是否存在if(isIDExisted){return;//存在则直接返回}else{process(message);//如果不存在,处理消息saveID(ID);//存储ID}但是在极端情况下,这种方式还是会出问题。如果消息处理后还没有保存到数据库,消费者会崩溃如果重启电脑,重启后会重新获取消息。执行查询时,消息还没有被消费,仍然会执行两次消费。可以引入数据库事务来解决这个问题,但是会降低系统性能。如果对消息的重复消费没有特别严格的要求,直接使用这种不引入事务的通用方案就可以了。毕竟,这也是概率极低的事情。在业务层面,我们有更多的选择,比如乐观锁、悲观锁、内存去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。我们以乐观锁为例。比如我们要给一个用户加分。因为加分操作不需要放在主业务中,我们可以使用消息系统异步通知。要使用乐观锁,我们需要在点表中添加点。版本号字段。并且在生产消息的时候,先查询这个账号的版本号,和消息一起发送到消息系统。图片描述消费者获取到消息和版本号后,在执行更新积分操作的SQL时会携带版本号,类似:updatescoresetscore=score+20,version=version+1whereuserId=1andversion=1;消息消费成功后,version变为2,那么如果有重复的version=1的消息再次被consumer拉取,则SQL语句不会执行成功,从而保证了消息的幂等性。为了保证消息只被消费一次,我们需要关注消费者这一块,利用幂等性来保证消息只被消费一次。今天站在消息中间件的通用层面,讲了如何保证数据不丢失,只消费一次。希望今天的文章对大家的学习或者工作有所帮助。如果觉得文章有价值,请点个赞,谢谢。