当前位置: 首页 > 科技观察

线上Kafka消息堆积,消费者下线,怎么办?

时间:2023-03-21 21:56:00 科技观察

Kafka消息在线堆积,所有消费者都离线,这是怎么回事?最近处理了一个线上故障。具体故障表现在Kafka主题消息堆积,该主题相关消费者全部下线。整体的排错过程和后面的回顾都很有意思,结合这次失败,让我对Kafka使用的最佳实践有了更深的理解。好吧,让我们一起回顾一下这个在线失败。最佳实践总结在最后,不要错过。一、现象在线kafka消息突然开始堆积。消费者应用反馈没有收到消息(没有处理消息的日志)。在kafka的消费者组上,是没有消费者注册的。consumer应用和kafka集群上周没有代码和配置相关的变化。2、在排查过程中,服务端和客户端都没有特殊的异常日志。其他Kafka主题的生产和消费都正常,基本可以判断是客户端消费有问题。所以我们专注于客户端故障排除。1)Arthas在线修改日志级别,输出debug。由于客户端没有明显的异常日志,要想找到蛛丝马迹,只能通过arthas修改应用日志级别。果然还有更重要的发现:2022-10-2517:36:17,774DEBUG[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]-[ConsumerclientId=consumer-1,groupId=xxxx]Disablingheartbeatthread2022-10-2517:36:17,773DEBUG[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]-[ConsumerclientId=consumer-1,groupId=xxxx]发送LeaveGroup请求给协调器xxxxxx(id:2147483644rack:null)好像是kafka-client主动给kafka集群发送消息,驱逐自己。所以消费者都是线下的。2)Arthas查看相关线程状态变量使用arthasvmtool命令进一步查看kafka-client相关线程的状态。可以看到HeartbeatThread线程状态为WAITING,Cordinator状态为UNJOINED。此时结合源码推测,很可能是客户端因消费时间过长而自驱逐。于是立马尝试修改max.poll.records,减少批量拉取的消息数,同时增加max.poll.interval.ms参数,避免拉取间隔过长导致的自驱逐。参数修改上线后,发现消费者并没有下线,但是消费了一段时间后,还是停止消费了。3.最终原因相关同学查看了消费逻辑,发现业务代码存在死循环,确认了最终原因。消息内容中某个字段有新值,触发consumer消费逻辑死循环,导致后续消息无法消费。同时,消费阻塞导致消费者自驱逐,分区重新平衡,所有消费者一一自驱逐。这里的核心涉及到Kafka的消费者与Kafka之间的保活机制,可以简单理解一下。Kafka-client会有一个独立的线程HeartbeatThread来与kafka集群进行定时心跳。该线程与listener无关,完全独立。根据调试日志中显示的“SendingLeaveGrouprequest”信息,我们很容易定位到自驱逐的逻辑。HeartbeatThread线程在发送心跳之前,会将当前时间与上次轮询时间进行比较。一旦大于max.poll.interval.ms参数,就会发起自驱逐。四、进一步思考虽然最后找到了原因,但回头看整个调查过程,并不顺利。主要有两点:kafka-client是否可以对某个消息消费超时有明确的异常?有没有办法发现消费的无限循环,而不是只看到自我驱逐和再平衡?4.1kafka-client是否可以对某个消息消费超时有明确的异常?4.1.1Kafka好像没有类似的机制。如果我们将消费逻辑断点,我们可以很容易地看到整个调用链路。对于消费者来说,一个线程池主要用来处理每一个kafkaListener,一个listener就是一个独立的线程。该线程会同步处理poll消息,然后动态代理会回调用户自定义的消息消费逻辑,也就是我们写在@KafkaListener中的业务。因此,可以从中学到两件事。第一,如果业务消费逻辑慢或者卡顿,轮询会受到影响。第二点,这里没有参数直接设置消费超时时间,其实不太好做。因为这里做了一个超时中断,所以轮询也会被中断,在同一个线程中。所以要么轮询和消费逻辑在两个工作线程中,要么当前线程中断后,又启动一个新的线程轮询。所以从业务使用的角度来说,可能的实现方式是自己设置业务超时时间。更通用的实现可以是在消费逻辑中,使用线程池来处理消费逻辑,同时使用Futureget来阻塞超时中断。google了一下,发现kafka0.8以前有consumer.timeout.ms这个参数,但是现在的版本没有这个参数。不知道有没有类似的效果。4.1.2RocketMQ有一些相关的机制,后来查了下RocketMQ有没有相关的实现,找到了。在RocketMQ中,可以为消费者设置consumeTimeout。这个超时有点像我们的想法。消费者会启动一个异步线程池对定时消费的消息做cleanExpiredMsg()处理。请注意,如果消息类型是有序的,则此机制不起作用。如果是并发消费,则会进行超时判断。如果超时,这条消息的信息会通过sendMessageBack()方法返回给broker重试。如果消息重试次数超过一定次数,就会进入RocketMQ的死信队列。其实spring-kafka也有类似的封装。可以自定义死信topic,做异常处理。4.2有没有快速找到死循环的方法?一般来说,死循环的线程会导致CPU飙升,OOM等现象。在这个故障中,没有相关的异常表现,所以与死循环问题无关。经历了这次失败,对kafka的相关机制有了更深的理解。轮询间隔超时很可能是消费阻塞甚至死循环造成的。因此,如果下次出现类似问题,消费者停止消费,但是kafkaListener线程还在,可以直接使用arthas的threadid命令查看对应线程的调用栈,看是否有方法异常无限循环调用。5.最佳实践通过这次失败,我们也可以总结出一些使用Kafka的最佳实践:在使用消息队列进行消费时,需要考虑异常情况,包括幂等性、耗时处理(甚至无限循环)的情况。尽量提高客户端的消费速度。消费逻辑开启新的线程进行处理,最好做超时控制。减少群组订阅的主题数量。一个群订阅主题最好不要超过5个。建议一个群只订阅一个主题。参考以下说明调整参数值:max.poll.records:减小参数值,建议远小于<单线程每秒消费记录数>*<消费线程数>*产品。max.poll.interval.ms:该值应大于/(<每秒单个线程消费记录数>*<消费线程数>)的值。