当前位置: 首页 > 科技观察

双十一的时候,卡夫卡就这样丢消息,让我措手不及

时间:2023-03-15 17:24:32 科技观察

说实话,今年双十一有点“背锅”了。这次双十一过得很充实,也让我意识到,如果不系统的学习Kafka,就无法对生产集群进行及时的预警,将故障扼杀在摇篮里,所以我也补上了自己的心思去研究Kafka的核心。在这篇文章中,我首先分享一个我没有想到的故障:Kafka生产环境大面积丢消息。首先要说明的是,消息丢失不是因为断电,集群中的副本数为3,消息发送者设置的acks=-1(all)。这么严格的设置,为什么还会出现消息丢失?请作者慢慢听。1、故障现象故障发生时,收到多个项目组的反馈,消费组位置重置为前几天。截图如下:从上面的消费组延迟监控曲线来看,积压量瞬间从零直接飙升,初步怀疑是位置被重置了。为什么那个位被重置?什么?这篇文章不是说要讲Kafka为什么会丢消息吗?你为什么要谈论消费者群体位被重置?头条党!!!NO,NO,NO,各位读者,绝对不是文中有误,请带着这个问题跟我一起探讨。2.问题分析遇到问题不要慌张,要讲道理。基于MQ应用,消费端一般会实现幂等性,即消息可以重复处理,不影响业务。因此,解决方案是先请项目组进行评估,先手动将站点设置到问题发生前30分钟左右,这样可以快速止血。一波操作猛如虎,接下来就是分析问题的原因了。通过查看当时Kafka服务器的日志(server.log),可以看到如下日志:上面的日志被修改为“无法识别”,关键日志如下:Memberconsumer-1-XXingroupconsumerGroupNamehasfailed,removeditfromThelogdirectionabovegroupPreparingtorebalancegroupXXXXonheartbeatexpiration非常明显:因为心跳检测到期,consumergroupcoordinator将consumer从consumergroup中移除,从而触发rebalancing。Consumergrouprebalancing:当topic分区或者consumer的数量发生变化时,consumer需要重新分配partition来实现端到端的consumer负载均衡。在rebalancing期间,消息消费者的消费会被暂停。当消费者重新完成分区的负载均衡后,会继续从服务端拉取消息。这个时候消费者不知道从哪里下手,需要向服务器查询位置。点,让消费者可以从上一个消费点继续消费。现在消费点重置为最早的点,是不是可以理解为点丢了?那为什么会丢分呢?原因有二:服务器丢点,导致客户端无法查询到点客户终端主动向服务器提交-1,导致位置丢失。目前我们公司使用的Kafka版本是2.2.x。消费者组的位置存储在一个系统主题(__consumer_offsets)中,无论是服务器级别还是Topic级别。参数unclean.leader.election.enable设置为false,表示只有ISR集合中的replicas才能参与leader选举,这样可以严格保证站点信息不会丢失或返回到某个历史上的网站。查看客户端提交的站点API,发现用于封装客户端站点的实体类会对站点进行校验。代码截图如下:如果传入的站点是-1,会直接抛出异常,这样客户端就没有机会把-1的站点提交给服务器了,为什么那个站点丢了呢?为了进一步探究,我们不得不将目光投向消费者群体是如何首次获取站点的,从源代码的角度进行分析。为了找到关键日志,并比较日志文件,尝试找到问题的解决方案。2.1客户端位置搜索机制为了探索客户端位置获取机制,作者详细阅读了消费者在启动时的过程。具体入口是KafkaConsumer的poll方法。详细流程图如下:以上核心点解释如下:消费者(KafkaConsumer)的poll方法消息时调用updateAssignmentMetadataIfNeeded方法,主要执行消费者组初始化,消费者组重平衡等元数据相关的工作,以及消费位置获取。如果当前consumergroup订阅的partition(rebalancing后分配的partition)都有位置,则返回true,表示不需要更新位置。如果当前分配的分区没有正确的位置(例如rebalance后新添加的分区),需要向服务器发送查找位置请求,服务器查询__consumer_offsets主题并返回位置信息。如果查询位置,则输出DEBUG级别日志(Settingoffsetforpartition),输出从服务器查询的位置;如果不查询位置,也会输出DEBUG级别的日志(Foundnocommittedoffsetforpartition)。如果没有找到位置,则需要根据消费者组配置的位置重置策略进行重置。具体配置参数:auto.offset.reset,可选值:latestlatestlocationearliestearliestlocationnone不重置位置如果重置位置选择none,会抛出NoOffsetForPartitionException。如果resetlocation是latest或者earliest,consumer会从查询到的location开始消费,输出DEBUG级别的日志(ResettingoffsetforpartitionXXtooffsetXXXX.)不幸的是,consumer的locationsearch机制,打印的processlogkafkaclient是DEBUG级别的,在生产环境下基本不输出,不方便我排查问题(找够证据)。这里不得不吐槽一下Kafka的日志输出策略:站点的变化是非常关键的状态变化,输出这些日志的频率不会很高。日志级别应该使用INFO而不是DEBUG。Kafka的log是Debug,所以当时找不到证据解释。我们只能找出为什么会因为心跳超时而触发再平衡。温馨提示:至于为什么心跳超时会触发rebalancing,在后续故障分析相关的文章中会详细说明。找到rebalancing触发的原因后,在测试环境中进行压测并重现,并将客户端日志级别设置为debug寻找证据。功夫不负有心人,完美找到了上面提到的三个日志:Settingoffsetforpartition位置在第一次查询时找到,不是-1,也不是最早的位置。Foundnocommittedoffsetforpartition经过反复rebalancing,反复查询log,后面也无法正确查询到position,但是没有找到position(返回-1)。将分区XX的偏移量重置为偏移量XXXX。该站点已根据重置策略进行了重置。从上面的日志分析,也可以明确的得出服务器是有存放消费者组的位置的,否则不会出现第一条日志,已经成功找到了一个有效的位置,但是在后续的rebalancing过程中,当站点需要多次查询,它返回-1。服务器在什么情况下返回-1?Broker服务器处理心跳包的入口是kafkaApis的handleOffsetFetchRequest方法。找到获取站点的关键代码,如下图所示:由上可知,服务端返回INVALID_OFFSET=-1L的情况如下:消费者组信息管理器中的缓存(内存)中不存在该消费者组,并且会返回-1,那么什么情况下服务端会没有正在使用的消费者组的元信息呢?领导者选举发生在__consumer_offsets主题的分区中。当前Broker拥有的partition变为follower后,该partition对应的consumergroup的meta信息会被移除。为什么会这样?这背后的原因是Kafka中的consumergroup需要在Broker端选出一个groupcoordinator来协调consumergroup的rebalancing。选举算法是取consumergroupname的hashcode,得到的值与consumer_offsetstopic的partitionnumber取模得到一个partitionnumber,然后是partition的leader节点所在的Broker是消费者组的组协调器。因此,如果partitionleader发生变化,则需要重新选举与之关联的consumergroup的groupcoordinator。删除消费者组时移除设备。消费组状态为GroupState.Dead消费组状态变为Dead,通常有以下几种情况:消费组被删除。__consumer_offsets分区领导者发生变化,触发站点重新加载。首先,将consumergroup的状态改为Dead,然后会在新的partitionleader所在的机器上加载一个新的location,然后引导consumergroup进行rebalance。服务器没有保存消费组的位置信息,也就是说消费组没有提交过上述情况。对于一个长期运行的消费组,会不会出现上述情况呢?找到服务器相关日志,可以明显看到大量__consumer_offsets相关分区有leader选举,很容易触发上面第一种情况。这样消费组发起的OffsetFetch请求可能会返回-1,从而引导消费组按照重置策略进行定位。单击重置。看文章开头,消费组设置的重置策略选择最早,消费组的消费积压瞬间从0暴涨到上亿,可以解释。看到这里,大家是不是顿时觉得“背脊发凉”,如果消费组配置的位置重置策略(auto.offset.reset)是最新的,是不是容易造成消息丢失,即跳过一些消费反而被消耗,示意图如下:本文到此结束。关于Kafka集群中为什么会有大量的__consumer_offsets用于Leader选举,后续文章会一一展开。请继续关注我。3.老实说,由于Kafka服务器使用的编程语言是scala,所以笔者并没有尝试去阅读Kafka的源码,只是详细分析了Kafka的消息发送和消息消费机制。本以为在使用层面控制公司各个项目关于Kafka的问题会很容易,但实际上都是一样的。项目组的咨询我可以游刃有余,但是一旦服务器端出现问题,我还是会有些手足无措。当然,对于集群问题,我们有一套完整的应急预案,但是一旦出现问题,虽然可以快速恢复,但是一旦出现故障,损失就无法避免。因此,我们还是要深入研究自己负责的内容,提前做检查,根据系统的知识提前避免故障。比如大部分朋友应该都知道kafka在后续版本中的消费位置是存储在系统topic__consumer_offsets中的,但是又有多少人知道这个topic的分区一旦发生了leader选举,大量的消费都会随之而来组重新平衡,导致消费组停止消费?因此,笔者将下定决心阅读kafka服务器的相关源码,系统地了解Kafka,在工作中更好地驾驭Kafka。专栏正在路上。感兴趣的朋友可以点击文章前面的标签关注。最后,期待你们的点赞,你们的点赞也是我最大的动力,我们下期再见。