当前位置: 首页 > 科技观察

Go语言如何操纵Kafka保证消息不丢失

时间:2023-03-12 10:26:04 科技观察

背景目前,一些互联网公司的核心业务都是使用消息队列。因为是核心业务,所以他们对数据的最终一致性很敏感。如果中间出现数据丢失,会引起用户抱怨,年终性能变成325。之前和几个朋友聊天,他们公司都在用kafka做消息队列。使用Kafka会丢失消息吗?消息丢失如何采取补偿措施?在这篇文章中,我们将一起分析并讨论介绍如何使用Go来操作Kafka而不会丢失数据。本文对kafka的操作基于:https://github.com/Shopify/saramakafka架构初识维基百科对kafka的介绍:Kafka是Apache软件基金会开发的开源流处理平台,用Scala编写和爪哇。该项目的目标是为处理实时数据提供一个统一的、高吞吐量、低延迟的平台。它的持久层本质上是一个“基于分布式事务日志架构的大规模发布/订阅消息队列”,这使得它作为处理流数据的企业级基础设施非常有价值。此外,Kafka可以通过KafkaConnect连接到外部系统(用于数据输入/输出),并提供KafkaStreams-一个Java]流处理库。该设计深受事务日志的影响。Kafka整体架构比较简单,主要由producer、broker、consumer组成:截图2021-09-1210.00.13AM我们针对架构图对各个模块进行讲解:Producer:数据的生产者,可以发布数据到选定的主题。消费者:数据的消费者由消费者组标识。主题中的每条记录都将分配给订阅消费者组中的一个消费者实例。消费者实例可以分布在多个进程或多台机器上。Broker:消息中间件处理节点(服务器),一个节点就是一个broker,一个Kafka集群由一个或多个broker组成。下面再介绍一些其他的概念:Topic:可以理解为消息的集合。主题存储在代理中。一个主题可以有多个分区。一个主题可以有多个生产者来推送消息。一个主题可以有多个消费者向它拉取消息,一个主题可以存在于一个或多个代理中。分区:它是主题的子集。不同的partition分配在不同的broker上进行水平扩展,增加Kafka的并行处理能力。同一个topic下不同partition的信息不同,同一个partition的信息是有序的;每个partition都有一个或多个replicas,其中选举出一个leader,fowler从leader拉取数据更新自己的log(每个partition逻辑上对应一个log文件夹),consumer拉取信息给leader。kafka丢失消息的三个节点。生产者推送消息。node首先看producer的写入大致流程:producer首先从kafka集群中找到partition的leader。producer向leader发送消息,leader将消息写入本地的follower,并从leader处拉取,将消息写入本地日志后,leader发送ack。领导者收到ISR中所有副本的ACK后,它会添加高水位线并向生产者发送确认。截图2021-09-1211.16.43AM通过这个过程我们可以看到Kafka最后会返回一个ack来确认推送消息的结果。这里kafka提供了三种模式:NoResponseRequiredAcks=0WaitForLocalRequiredAcks=1WaitForAllRequiredAcks=-1NoResponseRequiredAcks=0:表示数据推送成功与我无关WaitForLocalRequiredAcks=1:当本地(leader)确认接收成功成功,可以返回WaitForAllRequiredAcks=-1:当所有的leader和follower都成功接收后,就会返回。因此,根据这三种模式,我们可以推断生产者在推送消息时有一定概率会丢失,分析如下:如果我们选择模式1,则该模式下数据丢失的概率为非常高,我们无法重试。如果我们选择模式2,这种模式下只要leader不挂掉,就可以保证数据不丢失,但是如果leader挂了,follower还没有同步数据,就会有一定的几率丢失数据.如果选择模式3,这种情况不会造成数据丢失,但可能会造成数据重复。如果leader和follower同步数据是网络的问题,可能会造成数据重复。因此,在生产环境中,我们可以选择方式二或者方式三来保证消息的可靠性。我们需要根据业务场景来选择。如果我们关心吞吐量,我们可以选择模式2。如果我们不关心吞吐量,我们可以选择模式3。如果我们要充分保证模式3是最可靠的选择,不会丢失数据。kafka集群本身的故障导致kafka集群在接收到数据后将数据持久化存储,最后将数据写入磁盘。写磁盘这个步骤也有可能造成数据丢失,因为在写磁盘的时候操作系统会先把数据写入缓存中,而操作系统把缓存中的数据写入磁盘的时间是不确定的,所以在这种情况下,如果Kafka机器突然死机,也会造成数据丢失,但是这种情况发生的概率很小。一般公司内部的kafka机器都会做备份。这种情况是极端的,可以忽略不计。消费者拉取消息节点推送消息时,会向Partition追加数据并分配偏移量。这个偏移量表示当前消费者消费的位置。通过这个Partition,也可以保证消息的顺序。当消费者拉取到某个消息后,可以设置自动提交或者手动提交。如果提交成功,偏移量会发生偏移:screenshot2021-09-12pm3.37.33,所以自动提交会造成数据丢失,手动提交会带来数据重复的问题,分析如下:设置自动提交时,当我们拉取消息的时候,此时已经提交了offset,但是我们在处理消费逻辑的时候失败了,会导致数据丢失在设置手动提交的时候,如果我们处理完消息再提交commit,那么就会失败在commit这一步,会导致重复消费的问题。与数据丢失相比,重复消费更符合业务预期。我们可以通过一些幂等的设计来避免这个问题。实际完整代码已经上传到github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo解决推送消息丢失问题主要通过两点:设置RequiredAcks模式解决,并选择WaitForAll,保证数据推送成功,但是会影响延迟,引入重试机制,设置重试次数和重试间隔,所以我们编写如下代码(提取创建客户端的部分):funcNewAsyncProducer()sarama.AsyncProducer{cfg:=sarama.NewConfig()version,err:=sarama.ParseKafkaVersion(VERSION)iferr!=nil{log.Fatal("NewAsyncProducerParsekafkaversionfailed",err.Error())returnnil}cfg.Version=versioncfg.Producer.RequiredAcks=sarama.WaitForAll//三种模式可选cfg.Producer.Partitioner=sarama.NewHashPartitionercfg.Producer.Return.Successes=truecfg.Producer.Return.Errors=truecfg.Producer.Retry.Max=3//设置重试3次cfg.Producer.Retry。backoff=100*time.Millisecondcli,err:=sarama.NewAsyncProducer([]string{ADDR},cfg)iferr!=nil{log.Fatal("NewAsyncProducerfailed",err.Error())returnnil}returncli}解决拉取message丢失问题的解决方法比较粗糙。直接使用自动提交方式,每次实际消费后手动提交offset,但会造成重复消费的问题,不过好解决,用幂等操作解决即可。代码示例:funcNewConsumerGroup(groupstring)sarama.ConsumerGroup{cfg:=sarama.NewConfig()version,err:=sarama.ParseKafkaVersion(VERSION)iferr!=nil{log.Fatal("NewConsumerGroupParsekafkaversionfailed",err.Error())returnnil}cfg.Version=versioncfg.Consumer.Group.Rebalance.Strategy=sarama.BalanceStrategyRangecfg.Consumer.Offsets.Initial=sarama.OffsetOldestcfg.Consumer.Offsets.Retry.Max=3cfg.Consumer.Offsets.AutoCommit.Enable=true//启用自动提交,需要手动调用MarkMessage才能生效cfg.Consumer.Offsets.AutoCommit.Interval=1*time.Second//Intervalclient,err:=sarama.NewConsumerGroup([]string{ADDR},group,cfg)iferr!=nil{log.Fatal("NewConsumerGroupfailed",err.Error())}returnclient}以上主要是创建ConsumerGroup部分。细心的读者应该已经看到,我们这里使用的是自动提交。手动提交怎么样?这是因为我们的kafka库的特点不一样。这个自动提交需要结合MarkMessage()方法来提交(有疑惑的朋友可以实践一下,或者看看源码),否则会提交失败,因为我们在写消费逻辑的时候,是这样写的:func(eEventHandler)ConsumeClaim(sessionsarama.ConsumerGroupSession,claimsarama.ConsumerGroupClaim)error{formsg:=rangeclaim.Messages(){vardatacommon.KafkaMsgiferr:=json.Unmarshal(msg.Value,&data);err!=nil{returnerrors.New("failedtounmarshalmessageerris"+err.Error())}//操作数据,打印log.Print("consumerClaimdatais")//消息处理成功后,会标记为已处理,然后自动提交session.MarkMessage(msg,"")}returnnil},也可以直接使用手动提交的方式解决。只需要两步:第一步:关闭自动提交:consumerConfig.Consumer.Offsets.AutoCommit.Enable=false//禁用自动提交,改为手动第二步:在消费逻辑中添加如下代码。手动commit模式,需要先标记一下,commitsession.MarkMessage(msg,"")session.Commit()完整代码可以在github上下载验证!总结这篇文章,我们主要讲解了两个知识点:Kafka会产生消息丢失。使用Go操作Kafka,如何配置不丢失数据在日常业务开发中,很多公司喜欢使用消息队列进行解耦,需要注意。使用Kafka作为消息队列并不能保证数据不丢失。我们需要自己手动配置补偿。别忘了,不然又是P0事故