前言在我的工作中,我为很多业务使用了消息中间件MQ。我在一家交通行业的公司工作,业务中经常涉及处理一些非法数据的场景。RabbitMQ在项目中经常使用,今天想和大家聊一聊如何避免消息丢失和重复消费。如果我们有消息的时候MQ服务器突然宕机了怎么办?我们过去发送的所有消息都消失了吗?是的,一般的MQ中间件为了提高系统的吞吐量,都会将消息保存在内存中。如果不做其他处理,一旦MQ服务器宕机,所有的消息都会丢失。这是业务不允许的,影响很大。遇到这类问题,一般有以下几种处理方式。如何避免消息丢失和持久化在RabbitMQ中发送消息时,会有一个持久化参数可以设置,如果设置为true,则持久化。这样的话,即使MQ服务器宕机,重启后磁盘文件中也会有消息保存,不会丢失。是的,这样的话,有一定的概率消息不会丢失。但是也会有一种场景,就是消息刚保存在MQ内存中,还没来得及更新到磁盘文件中,突然就死机了。这种场景在大量消息持续传递的过程中非常常见。那么该怎么办?我们如何确保它会持久化到磁盘?confirm机制RabbitMQ通过confirm机制来通知我们持久化是否成功?confirm机制的原理:(1)消息生产者向MQ发送消息。如果消息接收成功,MQ会向生产者返回ack消息;(2)如果消息没有接收成功,MQ会返回一个nack消息给producer;这能保证100%的消息不会丢失吗?如果我们的producer每次都发消息,必须要MQ持久化到磁盘,然后才会发起ack或者nack的回调。这样的话,是不是我们MQ的吞吐量就很低了,因为每次都要把消息持久化到磁盘上。写入磁盘非常慢。这在高并发场景下是不能接受的,吞吐量太低了。因此,MQ持久化磁盘的真正实现是通过异步调用来处理的。它有一定的机制,比如:当有几千条消息时,一次性刷入磁盘。而不是每次有消息来就刷盘。所以comfirm机制其实是一种异步的监听机制,保证系统的高吞吐量,这就导致现在仍然无法100%保证消息不丢失,因为即使加上了confirm机制,MQ内存中尚未刷新消息。Disk-to-diskdown了,还是处理不了。数据库事务机制生产者在下发消息前可以在本地数据库中建立消息表,先将消息持久化到Redis或DB中,这样就可以使用本地数据库的事务机制。事务提交成功后,将消息表中的消息传送到消息队列中。confirm机制监控消息是否发送成功?如果确认消息成功,则在数据库中删除此消息。如果nack报文不成功,可以根据自己的业务选择是否重发报文。您也可以自行决定删除此消息。如何避免消息重复消费幂等性就是一个业务在相同条件下的操作,无论操作多少次,结果都是一样的。重复消费的问题?重复消费的原因可能出现在生产者身上,也可能出现在MQ或消费者身上。这里说的重复消费问题是指同一条数据被执行了两次,不仅是MQ中的一条消息被消费了两次,而且MQ中有两次相同的消费。生产者:生产者可能会重复向MQ推送一条数据。为什么会这样?可能是一个Controller接口被重复调用了两次,接口不是幂等的;也有可能是向MQ推送消息时响应慢,生产者的重试机制导致消息再次推送。MQ:当consumer消费一条数据,响应ack信号消费成功,MQ突然挂掉,导致MQ认为consumer还没有消费这条数据,MQ回收后再次推送消息,造成重复消费。Consumer:消费者消费完一条消息,准备向MQ发送ack信号。这时,消费者挂断了。服务重启后,MQ认为消费者没有消费消息,重新推送消息。如何保证幂等性?消费者如何解决重复消费的问题?这里提供两种方法:状态判断法:消费者消费数据后,将消费数据记录到redis中,下次消费时检查redis中是否存在该消息。如果存在,则说明该消息已经被消费,直接丢弃该消息。业务判断方法:通常,数据消费后,需要插入到数据库中,利用数据库的唯一性约束,防止重复消费。每次消费直接尝试插入数据。如果提示unique字段重复,则消息直接丢失。一般来说,这种业务判断方式可以简单高效的避免消息的重复处理。
