当前位置: 首页 > 后端技术 > Java

RabbitMQ和Kafka:如何处理消息丢失问题

时间:2023-04-01 17:58:04 Java

本文主要讨论以下问题:如何处理消息丢失问题。数据丢失的问题可能出现在生产者、MQ、消费者。下面我们借鉴RabbitMQ和Kafka来分别分析。RabbitMQ生产者丢失了数据。producer在向RabbitMQ发送数据的时候,可能会因为网络问题什么的导致数据中途丢失。这时可以选择使用RabbitMQ提供的事务功能,即生产者在发送数据前打开RabbitMQ事务channel.txSelect,然后发送消息。如果消息没有被RabbitMQ成功接收,生产者会收到异常错误,这时可以回滚事务channel.txRollback,然后重试发送消息;如果收到消息,则可以提交事务channel.txCommit。//开启事务channel.txSelecttry{//在这里发送消息}catch(Exceptione){channel.txRollback//在这里再次发送这条消息}//提交事务channel.txCommit但问题是,RabbitMQ事务机制(同步)一次做完了,吞吐量基本会下降,因为太耗性能了。所以一般来说,如果要保证写入RabbitMQ的消息不丢失,可以开启confirm模式。在producer上设置了confirm模式后,你写的每条消息都会被分配一个唯一的id,然后如果你写在RabbitMQ中,RabbitMQ会给你返回一个ack消息,告诉你消息是ok的。如果RabbitMQ处理消息失败,会回调你的其中一个nack接口告诉你消息接收失败,你可以重试。并且你可以结合这个机制来维护每个消息id在内存中的状态。如果一定时间后还没有收到这条消息的回调,可以重新发送。事务机制和confirm机制最大的区别在于事务机制是同步的。你提交交易后,它会被阻塞在那里,但确认机制是异步的。当你发送完一条消息后,你可以发送下一条消息,然后RabbitMQ接收到消息后,会异步回调你的其中一个接口,通知你消息已经接收到。所以producer一般会使用confirm机制来避免数据丢失。RabbitMQ丢失数据是指RabbitMQ自身丢失数据。必须开启RabbitMQ的持久化,即消息写入后,会持久化到磁盘。即使RabbitMQ自己挂了,恢复后也会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况是RabbitMQ在坚持之前就挂掉了,这可能会造成少量数据丢失,但这种概率很小。设置持久化有两个步骤:创建队列时,将其设置为持久化,这样RabbitMQ可以持久化队列的元数据,但不会持久化队列中的数据。二是在发送消息时设置消息的deliveryMode为2,设置消息持久化。这时候RabbitMQ会将消息持久化到磁盘中。这两个持久化必须同时设置。即使RabbitMQ挂了再重启,它也会重启并从磁盘恢复队列,恢复队列中的数据。注意,即使你开启了RabbitMQ的持久化机制,也有可能这条消息写到RabbitMQ,但是还没有来得及持久化到磁盘。不幸的是,此时RabbitMQ挂掉了,会导致内存中的数据丢失一点点。所以持久化可以和生产者端的confirm机制结合起来。只有消息持久化到磁盘后,producer才会收到ack通知,所以还没持久化到磁盘,RabbitMQ就挂了,数据丢失了,producer收不到ack,也可以自己重发。消费端丢数据RabbitMQ如果数据丢了,主要是因为你消费的时候刚刚消费,还没有处理。结果进程挂了,等重启,那就尴尬了。RabbitMQ认为你已经消费了它。数据丢失。这个时候就得用到RabbitMQ提供的ack机制了。简单的说,你必须关掉RabbitMQ的自动ack,可以通过一个api调用,然后每次在自己的代码中确保处理完成,然后在程序Bundle中进行ack。这样的话,你还没有处理完,不就没有ack了吗?那么RabbitMQ就认为你还没有处理完。这时RabbitMQ会把这个消费分配给其他消费者处理,消息不会丢失。总结一下:Kafka消费端丢数据。唯一可能导致消费者丢失数据的情况是,你消费了这条消息,然后消费者自动提交了一个offset,让Kafka认为你已经消费了这条消息,但实际上你刚要处理这条消息,还没处理就挂了,这时候这条消息就会丢失。这不是类似于RabbitMQ吗?大家都知道Kafka会自动提交offsets,所以只要关闭自动提交offsets,处理完后再手动提交offsets,就可以保证数据不会丢失。但此时,仍有可能出现重复消费。比如你刚刚处理完,还没有提交offset,自己却挂了。这个时候,你肯定会再消费一次。自己保证幂等就好了。生产环境中遇到的一个问题是,我们的Kafka消费者消费数据后,先写入一个内存队列,然后缓冲。结果,有时候,你只是将消息写入内存队列,然后消费者会自动提交偏移量。那么这时候我们重启系统的时候,内存队列中还没有来得及处理的数据就会丢失。Kafka丢失数据比较常见的场景之一是Kafka的某个broker宕机,然后重新选举partition的leader。想一想,如果此时其他follower刚好有一些数据没有同步,leader此时挂掉了,选举出一个follower成为leader后,不就丢失了一些数据吗?这丢失了一些数据。我们在生产环境中也遇到过,我们也是。在kafka的leader机器宕机之前,将follower切换为leader后,我们会发现数据丢失了。所以这时候一般至少需要设置以下4个参数:为topic设置replication.factor参数:这个值必须大于1,每个partition至少要有2个replicas。在Kafka服务器上设置min.insync.replicas参数:这个值必须大于1。这是为了要求一个leader至少感知到至少有一个follower还在和自己保持联系,没有掉队,从而保证leader挂掉之后还有follower。.在producer端设置acks=all:这是要求每条数据都写到所有replicas才算成功。producer端设置retries=MAX(一个很大的值,表示无限重试):这个是要求一旦写入失败,就无限重试,卡在了这里。我们的生产环境是按照上面的要求配置的。这样配置之后,至少在Kafkabroker端,可以保证当leader所在的broker发生故障时,切换leader时不会丢失数据。生产者会丢失数据吗?如果按照上面的思路设置acks=all,是不会丢失的。要求是当你的leader收到消息,所有follower都同步消息后,才算写入成功。如果不满足这个条件,生产者会自动不断重试,无限重试。