当前位置: 首页 > 科技观察

追根究底:Kafka会丢失数据吗?

时间:2023-03-22 00:04:37 科技观察

今天我们就来说说Kafka生产环境中大家比较关心的问题。那么Kafka会丢数据吗?丢失数据怎么办?只有掌握了这些,才能处理一些Kafka生产级的故障,才能更稳定的为业务服务。仔细阅读本文后,相信您会对Kafka是如何解决数据丢失问题有更深入的了解。本文干货较多,希望大家耐心阅读。01概述越来越多的互联网公司使用消息队列来支持他们的核心业务。由于是核心业务,一般要求在消息传输过程中尽量保证消息不丢失。如果中间环节出现数据丢失,就会招来用户的投诉,年底的表现也会受到指责。那么使用Kafka会不会丢数据呢?丢失数据怎么办?为了避免类似情况的发生,除了采取补偿措施外,我们在系统设计时还应充分考虑系统中的各种异常情况,从而设计出稳定可靠的消息系统。大家都知道Kafka整个架构非常简单,分布式。它主要由三部分组成:Producer、Broker、Consumer。损失情景的分析将从这三个部分开始。02消息传递语义分析在深入分析消息丢失场景之前,我们先来说说什么是“消息传递语义”?所谓消息传递语义,就是Kafka提供的Producer和Consumer之间消息传递过程中消息传递的保证。主要有三种,如下图所示:1)首先,Producer向Broker发送数据后,会commit。如果提交成功,由于Replica副本机制的存在,意味着消息不会丢失,只是Producer将数据发送给了Broker。Broker遇到网络问题导致通信中断后,那么Producer无法准确判断消息是否已经提交(commit),可能会造成至少一次语义。2)在Kafka0.11.0.0之前,如果Producer没有收到消息commit的响应结果,只能重新发送消息,确保消息已经正确传递给Broker,并将消息写入日志重新发送时再次;0.11.0.0版本之后,Producer支持幂等发送选项,保证重发不会导致日志出现重复消息。为了实现这一点,Broker为Producer分配一个ID,并通过每条消息的序列号对其进行去重。它还支持类似的事务语义,以确保将消息发送到多个主题分区,确保所有消息要么写入成功,要么写入失败。这主要用于主题之间的exactlyonce语义。开启幂等传递的方法配置:enable.idempotence=true。启用事务支持的方法配置:设置属性transcational.id="specifiedvalue"。3)从Consumer的角度分析,我们知道Offset是Consumer自己维护的,如果Consumer收到消息后更新了Offset,那么Consumer异常崩溃,然后新的Consumer接手重新开始消费,会造成至多一次语义(消息丢失,但不重复)。4)如果Consumer消费完消息后又更新了Offset,如果此时Consumer崩溃了,那么新的Consumer接手后会使用这个Offset再次拉取消息,这样会造成至少一次语义(消息是没有丢失,但它是多次重复)。总结:默认情况下,Kafka提供了“至少一次”语义的消息传递,允许用户通过在处理消息之前保存Offset来提供“至多一次”语义。如果我们自己能做到消费幂等,理想情况下,这个系统的消息传递是严格的“exactlyonce”,即保证不丢失,只会被恰好处理一次,但这很难实现。从Kafka整体架构图中,我们可以总结出消息传递的三个流程:1)Producer端向KafkaBroker端发送消息。2)KafkaBroker同步消息,持久化数据。3)Consumer端从KafkaBroker拉取并消费消息。以上三个步骤都可能发生数据丢失,那么Kafka在什么情况下才能保证消息不丢失呢?通过以上三步,我们可以得出结论,Kafka只是对“已提交”的消息做“最大持久性保证不丢失”。如何理解上面的句子?1)首先是“已提交”的消息:当Kafka中的N个Broker成功收到一条消息并写入日志文件后,会告诉Producer消息已经提交成功,然后这条消息就变成了一个Kafka中的“提交消息”。我们怎么理解这里的NBrokers呢?这主要取决于“承诺”的定义。这里可以选择只要有一个Broker成功保存消息就认为committed,或者所有Broker都能成功保存消息就认为committed。2)第二个是“最大持久性保证不丢失”,也就是说Kafka不能保证在任何情况下数据都不会丢失。即Kafka有一个不丢数据的前提。如果此时你的消息保存在N个Brokers上,那么前提是N个Brokers中至少有一个是存活的,这样可以保证你的消息不会丢失。也就是说,Kafka可以不丢失数据,但是这些消息必须是“已提交”的消息,并且必须满足一定的条件。在了解了Kafka消息传递的语义和保证不丢包的情况下,下面我们来详细分析下各个环节为什么会丢包,以及如何最大程度的避免丢包。03消息丢失场景分析Producer端丢失场景分析在分析Producer端数据丢失之前,我们先了解一下Producer端发送消息的过程。对于不了解Producer的读者,可以查一下KafkaProducer的消息发送流程如下:1)首先,我们要知道Producer端直接和Broker中的LeaderPartition进行交互,所以在Producer端的初始化中,需要通过Partitioner分区器从Kafka集群中获取相关Topic对应的LeaderPartition的元数据。2)获取LeaderPartition的元数据后直接发送消息。3)KafkaBroker对应的LeaderPartition收到消息后会先将消息写入PageCache,并周期性刷新磁盘进行持久化(顺序写入磁盘)。4)FollowerPartition拉取LeaderPartition的消息,并与LeaderPartition的数据保持一致。消息拉取后,需要回复LeaderPartitionACK确认消息。5)当KafkaLeader和FollowerPartition同步数据并收到ISR中所有replica的ACK后,LeaderPartition会回复Producer一个ACK确认信息。根据上图和消息发送流程可以得出,Producer端为了提高发送效率,减少IO操作,在发送数据时,将多个request合并为一个RecordBatch,封装并转换为Request请求,以“asynchronously”将数据发送出去(也可以定时自动发送),所以Producer端的消息丢失比较多,因为消息根本没有发送到KafkaBroker端。Producer端消息发送不成功的原因有以下几种:网络原因:由于网络抖动,数据根本没有发送到Broker端。数据原因:消息体过大无法被Broker接受,导致Broker拒绝消息。另外,KafkaProducer端也可以通过配置确认消息是否成功生产:KafkaProducer端acks默认配置为1,默认level为至少一次语义,不保证exactlyonce语义。由于Producer端发送数据有ACK机制,这里可能会丢失数据!!!acks=0:由于发送后即认为传输成功,如果出现网络抖动,Producer端不会检查ACK。它丢失了,无法重试。acks=1:如果消息被LeaderPartition发送成功,则表示消息发送成功。这时候只要LeaderPartition不crash,就可以保证LeaderPartition不会丢失数据。但是如果LeaderPartition异常崩溃,FollowerPartition还没有同步完数据,没有ACK,此时数据就会丢失。acks=-1orall:消息的发送需要等待ISR中的LeaderPartition和所有FollowerPartition确认收到消息后,消息发送成功。可靠性最高,但不保证不丢失数据。比如当ISRPartition只剩下Leader时,就变成了acks=1的情况。Broker端丢失场景分析下面我们来看一下Broker端持久化存储丢失场景。对于不了解Broker的读者,可以先说说KafkaBroker。数据存储过程如下图所示:KafkaBroker集群接收数据接收到后,数据会持久化存储在磁盘上。为了提高吞吐量和性能,采用了“异步批量刷盘策略”,即按照一定的消息量和时间间隔刷盘。首先,数据将存储在“PageCache”中。至于什么时候刷新Cache中的数据,是由“操作系统”根据自己的策略决定的,或者调用fsync命令强制刷新磁盘。如果此时Brokercrash了,而一个远远落后于LeaderPartition的FollowerPartition被选为新的LeaderPartition,那么滞后的消息数据就会丢失。由于Broker端的消息存储是通过异步批量刷新的方式进行的,所以这里可能会丢失数据!!!由于Kafka没有提供“同步刷新”的方法,所以还是很有可能从单个Broker上丢失数据。Kafka通过“multi-Partition(分区)multi-Replica(副本)机制”已经能够最大程度的保证数据不丢失。如果数据已经写入PageCache,但没有掉电或者断电,极端情况下还是会导致数据丢失。消费端丢失场景分析接下来我们来看一下消费端消费数据丢失场景。对于不了解消费者的读者,可以先看看KafkaConsumer。我们看一下消费过程:1)Consumerpull在取数据之前,和Producer发送数据一样,需要通过订阅关系获取集群元数据,找到相关Topic对应的LeaderPartition的元数据.2)然后Consumer通过Pull方式主动从Kafka集群中拉取消息。3)在这个过程中,有一个consumergroup的概念(不理解可以看上面链接的文章),多个consumer可以组成一个consumergroup,即ConsumerGroup,每个consumergroup有一个Group-ID。同一个ConsumerGroup中的Consumer可以消费同一个Topic下不同partition的数据,但不会有多个Consumer消费同一个partition的数据。4)消息拉取后,进行业务逻辑处理。处理完成后会进行ACK确认,即提交Offset消费位移进度记录。5)最后将Offset保存到KafkaBroker集群中的Topic__consumer_offsets中,每个Consumer保存自己的Offset进度。根据上图和消息消费流程可以看出,消费主要分为两个阶段:获取元数据和从KafkaBroker集群拉取数据。处理消息,将消息标记为已消费,提交Offset记录。由于Consumer拉取消息后,最终会提交Offset,那么这里可能会丢失数据!!!可以使用“自动提交Offset方式”拉取消息,“先提交Offset,再处理消息”。如果此时处理消息异常down,因为已经提交了Offset,Consumer重启后,会从之前提交的Offset的下一个位置重新开始消费,不会再处理之前未处理的消息。对于消费者来说,消息刚刚丢失。拉取消息后,“先处理消息,再提交Offset”。如果此时提交前出现异常宕机,因为Offset没有提交成功,下次Consumer重启后会从上次的Offset重新拉取消息。不会有消息丢失,但会重复消费。在这里,只有业务本身才能保证幂等性。04消息丢失解决方案以上带大家从Producer、Broker、Consumer三端分析了可能出现的数据丢失场景。下面我们就来看看如何解决,最大程度的保证消息不丢失。Producer端解决方案在分析producer端丢失场景时,我们得出的结论是它是以“异步”的方式发送的,那么如果此时以“发送后烧录”的方式发送,即调用Producer。send(msg)将立即返回。由于没有回调,Broker可能因为网络原因收不到消息,此时就丢失了。因此,我们可以从以下几个方面解决Producer端消息丢失的问题:4.1.1改变调用方式:摒弃发送后调用烧录的方式,转而使用带有回调通知功能的方式发送消息,即Producer.send(msg,callback),这样一旦发现发送失败,就可以进行针对性的处理。Futuresend(ProducerRecordrecord,Callback回调);publicFuturesend(ProducerRecordrecord,Callbackcallback){//拦截记录,可能会被修改;此方法不会抛出异常ProducerRecordinterceptedRecord=this.interceptors==null?记录:this.interceptors.onSend(记录);returndoSend(interceptedRecord,callback);}(1)网络抖动导致消息丢失,Producer端可以重试。(2)如果消息大小不合格,可以在发送前适当调整以满足Broker的容忍度。通过以上方法,可以保证最大的消息发送成功。4.1.2ACK确认机制:该参数表示“committed”消息的定义。需要设置request.required.acks为-1/all,-1/all表示有多少个replicabroker收到了所有的消息才被认为是消息提交成功的标志。对于acks=-1/all,有两种非常典型的情况:(1)数据发送到LeaderPartition,所有ISRmember都同步了数据。此时LeaderPartition异常崩溃,会选举出新的LeaderPartition,数据不会丢失,如下图:(2)数据发送到LeaderPartition,同步部分ISR成员.此时LeaderPartition异常崩溃,剩下的FollowerPartition可能会被选举为新的LeaderPartition。producer端发送failureflag,后面会重新发送数据,数据可能会重复,如下图所示:所以,通过上面的分析,我们还需要配置其他参数来保证:replication.factor>=2min.insync.replicas>1这是Broker端的配置,下面会详细介绍。4.1.3重试次数retries:该参数表示Producer端发送消息的重试次数。Retries需要设置为大于0的数,Kafka2.4版本默认设置为Integer.MAX_VALUE。另外,如果需要保证发送消息的顺序,配置如下:retries=Integer.MAX_VALUEmax.in.flight.requests.per.connection=1,这样Producer端会一直重试,直到Broker端返回ACK标志,同时只有一个连接向Broker发送数据,保证了消息的顺序。4.1.4重试时间retry.backoff.ms:该参数表示消息发送超时后两次重试的间隔时间,避免无效的频繁重试,默认值为100ms,推荐设置为300ms。Broker端解决方案在分析broker端丢包场景时,我们得出的结论是它采用了“异步批量刷新”的策略,先将数据存储在“PageCache”中,然后进行异步刷新。由于没有“同步刷新”策略,所以Kafka采用了“多分区多副本”的方式,最大程度保证数据不丢失。我们可以通过以下参数的配合来保证:4.2.1unclean.leader.election.enable:该参数表示哪些Follower有资格被选举为Leader。IfaFollower'sdatalagsbehindtheLeadertoomuch,thenonceitiselectedLeader,thedatawillbelost,sowehavetosetittofalsetopreventthisfromhappening.4.2.2replication.factor:该参数表示分区副本数。建议设置replication.factor>=3,这样如果Leader副本异常崩溃,Follower副本会被选举为新的Leader副本继续提供服务。4.2.3min.insync.replicas:该参数表示至少有多少消息副本必须成功写入ISR才能被视为“已提交”。建议设置min.insync.replicas>1,以提高消息的持久性,保证数据不丢失。另外,我们还需要保证replication.factor>min.insync.replicas,如果相等,只要有一个replica异常crash,整个partition就无法正常工作,所以建议设置:replication.factor=min.insync.replicas+1,最大化系统可用性。消费端解决方案在分析消费端丢失场景时,我们得出的结论是需要在拉取消息后提交offset位移信息。因此,为了不丢失数据,正确的做法是:拉取数据,处理业务逻辑,提交ConsumeOffset位移信息。我们还需要设置参数enable.auto.commit=false,使用手动提交位移的方式。另外,在重复消费消息的情况下,业务本身保证了幂等性,保证一次消费成功即可。05总结在这里,我们总结一下本文的重点。1、从Kafka的整体架构上,概括了可能发生数据丢失的环节。2、带你分析“消息传递语义”的概念,确认Kafka只对“已提交”的消息做“最大持久化保证不丢失”。3、带你分析Producer、Broker、Consumer可能导致数据丢失的场景以及具体的高可靠方案。