作者个人研发在高并发场景下提供了一个简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。开源半年多以来,已成功为十几家中小企业提供精准定时调度解决方案,经受住了生产环境的考验。为了造福更多的童鞋,这里给出开源框架的地址:https://github.com/sunshinelyz/mykit-delay估计是几年前的运维没有拜服服务器,问题出在Nginx修好了,Kafka又失败了。今天本想睡一会,电话又响了。还在操作,“喂,冰河,你到公司了吗?”赶紧检查服务器,又出问题了。”“在路上,运维哥们不是还在上班吗?”“还在休假……”,我:“……”。咦,这哥们跑了?放过他吧,问题还是要解决。问题在公司再次出现后,我放下专用背包,拿出我的利器——笔记本电脑,打开快速登录监控系统,发现主业务系统没有任何问题。某非核心服务发出告警,监控系统显示该服务频繁抛出以下异常。2021-02-2822:03:05131pool-7-thread-3ERROR[]-commitfailedorg.apache.kafka.clients.consumer.CommitFailedException:Commitcannotbecompletedsincethegrouphasalreadyrebalancedandassignedthepartitionstoanothermember.Thismeansthatthetimebetweensubsequentcallstopoll()waslongerthantheconfiguredmax.poll.interval.ms,whichtypicallyimpliesthatthepollloopisspendingtoomuchtimemessageprocessing.Youcanaddressthiseitherbyincreasingthesessiontimeoutorbyreducingthemaximumsizeofbatchesreturnedinpoll()withmax.poll.records.atorg.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713)~[MsgAgent-jar-with-dependencies.jar:na]atorg.apache.kafka.clients.consumer。internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)~[MsgAgent-jar-with-dependencies.jar:na]atorg.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218)~[MsgAgent-jar-with-dependencies.jar:na]atcom.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121)~[MsgAgent-jar-with-dependencies.jar:na]atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[na:1.8.0_161]在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[na:1.8.0_161]在java.lang.Thread.run(Thread.java:748)[na:1.8.0_161]输出从上面的异常信息大概可以判断出系统中的问题:Kafka消费者处理完一批poll消息后,同步向broker提交offset时报错,可能是因为当前消费者线程的partition被回收了由broker发出,因为Kafka认为消费者宕机了,正如我们从下面的输出中看到的那样。Commitcannotbecompletedsincethegrouphasalreadyrebalancedandassignedthepartitionstoanothermember.Thismeansthatthetimebetweensubsequentcallstopoll()waslongerthantheconfiguredmax.poll.interval.ms,whichtypicallyimpliesthatthepollloopisspendingtoomuchtimemessageprocessing.Youcanaddressthiseitherbyincreasingthesessiontimeoutorbyreducingthemaximumsizeofbatchesreturnedinpoll()withmax.poll.records.Kafka内部触发了Rebalance机制,明确了问题,接下来,我们就开始分析问题了。AnalysisoftheproblemNow说Kafka触发Rebalance机制,先说一下Kafka触发Rebalance的时机。什么是再平衡?举一个具体的例子。比如某个组下有10个Consumer实例,这个组订阅了一个topic,有50个partition。通常情况下,Kafka会为每个消费者分配5个分区。这个分配过程就是Rebalance。何时触发Rebalance当Kafka中满足以下条件时,就会触发Rebalance:组内成员数量发生变化,例如有新的消费者加入或离开消费者组。群组成员离开消费群组包括群组成员崩溃或主动离开消费群组。订阅主题的数量已更改。订阅的主题分区数已更改。我们可以人为地避免后两种情况。在实际工作过程中,Kafkarebalance最常见的原因是消费者组成员的变化。消费者成员的正常添加和停止会导致Rebalance。这种情况无法避免,但在某些情况下,Consumer实例会被Coordinator误认为“停止”,从而被“踢出”Group,从而导致Rebalance。ConsumerGroup完成Rebalance后,每个Consumer实例都会周期性的向Coordinator发送心跳请求,表示自己还活着。如果一个Consumer实例不能及时发送这些心跳请求,Coordinator就会认为这个Consumer“死了”,将其从Group中移除,并开始新一轮的Rebalance。这个时间可以通过Consumer端的参数session.timeout.ms来配置。默认值为10秒。除了这个参数,Consumer还提供了一个参数来控制发送心跳请求的频率,这个参数就是heartbeat.interval.ms。该值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁发送心跳请求会消耗额外的带宽资源,但是好处是可以更快的知道当前是否开启了Rebalance,因为目前Coordinator通知每个Consumer实例开启Rebalance的方法是在响应中封装REBALANCE_NEEDED标志心跳请求的主体。除了以上两个参数,Consumer端还有一个参数,用于控制Consumer实际消费能力对Rebalance的影响,即max.poll.interval.ms参数。它限制了消费者端应用程序两次调用poll方法之间的最大时间间隔。它的默认值是5分钟,也就是说如果Consumer程序在5分钟内不能消费poll方法返回的消息,Consumer会发起“离群”请求,Coordinator也会开始新一轮的Rebalance.通过上面的分析,我们可以看看那些可以避免的rebalance:第一类不必要的rebalance是因为没有及时发送心跳,导致Consumer被“踢出”group。这种情况下,我们可以设置session.timeout.ms和heartbeat.interval.ms的值,尽可能避免rebalance。(以下配置是网上找到的最佳实践,尚未测试)设置session.timeout.ms=6s。设置heartbeat.interval.ms=2s。需要保证Consumer实例至少能发送3轮心跳请求才被判定为“死”,即session.timeout.ms>=3*heartbeat.interval.ms。设置session.timeout.ms为6s主要是为了让Coordinator更快的定位到挂掉的消费者,尽快将他们踢出Group。第二种非必要Rebalance是Consumer消费时间过长造成的。此时,max.poll.interval.ms参数值的设置尤为关键。如果要避免意外的Rebalance,最好将这个参数值设置大一点,稍微长于最大下游处理时间。总之,要给业务处理逻辑留足够的时间。这样Consumer就不会因为处理这些消息的时间太长而触发Rebalance。PullOffset和CommitOffsetKafka的偏移量(offset)是由消费者管理的。偏移量有两种,拉取偏移量(position)和提交偏移量(committed)。pulloffset代表当前消费者分区的消费进度。每次消息消费后,都需要提交offset。在提交offset时,Kafka会将pulloffset的值作为partition的commitoffset发送给coordinator。如果offset没有提交,下次consumer重连broker时,将从当前consumergroup已经提交给broker的offset开始消费。所以,这就是问题所在。当我们处理消息的时间过长,已经被broker淘汰了,offset的提交又会报错。因此,拉取的offset不会提交给broker,重新平衡partition。下次重新分配分区时,消费者将从最新提交的偏移量开始消费。这就是重复消费的问题。异常日志提示的解决方法其实说了这么多,在Kafka消费者输出的异常日志中也给出了相应的解决方法。接下来说一下Kafka中的pulloffset和commitoffset。其实从输出的日志信息来看,也给出了解决问题的方法。简单来说,max.poll.records可以通过增加max.poll.interval.msduration和session.timeout.msduration配置值来减少,consumer必须在处理完消息后及时提交offset。问题解决通过前面的分析,我们应该知道如何解决这个问题了。这里需要说明的是,我在集成Kafka的时候,使用了SpringBoot和Kafka消费者监听器。消费者的主要代码结构如下。@KafkaListener(topicPartitions={@TopicPartition(topic=KafkaConstants.TOPIC_LOGS,partitions={"0"})},groupId="kafka-consumer",containerFactory="kafkaListenerContainerFactory")publicvoidconsumerReceive(ConsumerRecord,?>record,Acknowledgmentack){logger.info("topicis{},offsetis{},valueis{}n",record.topic(),record.offset(),record.value());try{Objectvalue=record.value();logger.info(value.toString());ack.acknowledge();}catch(Exceptione){logger.error("LogConsumerException:{}",e);}}上面代码逻辑比较简单,也就是get到Kafka中收到消息后,直接打印输出到日志文件中。为了尝试解决这个问题,我首先根据异常日志的提示信息进行配置,所以在SpringBoot的application.yml文件中添加了如下配置信息。spring:kafka:consumer:properties:max.poll.interval.ms:3600000max.poll.records:50session.timeout.ms:60000heartbeat.interval.ms:3000配置完成后,再次测试消费者逻辑,发现还是ThrowsaRebalanceexception。最后,我们换个角度看看Kafka消费者带来的问题:一个Consumer在生产消息,另一个Consumer在消费它的消息。它们不能在同一个groupId下,只需更改其中一个的groupId即可。在这里,我们的业务项目是由模块和子系统开发的。例如,模块A生产消息,模块B消费模块A生产的消息。此时修改配置参数,如session.timeout.ms:60000,根本不起作用,仍然抛出Rebalance异常。此时,我尝试修改消费组的groupId,放入如下代码@KafkaListener(topicPartitions={@TopicPartition(topic=KafkaConstants.TOPIC_LOGS,partitions={"0"})},groupId="kafka-consumer",containerFactory="kafkaListenerContainerFactory")publicvoidconsumerReceive(ConsumerRecord,?>record,Acknowledgmentack){修改为如下代码。@KafkaListener(topicPartitions={@TopicPartition(topic=KafkaConstants.TOPIC_LOGS,partitions={"0"})},groupId="kafka-consumer-logs",containerFactory="kafkaListenerContainerFactory")publicvoidconsumerReceive(ConsumerRecord,?>记录,致谢){再次测试,问题解决~~本文转载自微信公众号「冰河科技」,可关注下方二维码。转载本文请联系冰川科技公众号。
