当前位置: 首页 > 科技观察

囧,Kafka生产实践中又出现问题

时间:2023-03-14 14:54:12 科技观察

1。背景最近Kafka日志集群一直在折腾。由于公司部署的应用不断增加,日志采集程序将采集到的日志发送到Kafka集群时有较大延迟,总TPS不可能一直上去。为了不影响业务团队通过日志排查问题,采用先解决问题再排查的方式对Kafka集群进行扩容。然而,扩容之后,比较尴尬的是,在新增的五台机器中,有两台机器的消费发送响应时间明显高于其他机器。为了保证消息服务的稳定性,临时对集群进行了缩容,将本机从集群中移除。具体操作简单粗暴使用killpid命令,却发生了意想不到的事情。发现Java客户端报如下错误:而Go客户端报如下错误:基本上可以认为是部分分区没有onlineleader,无法成功发送消息。2、问题分析为什么会出现这个问题?如果Kafka节点下线,它会自动触发故障转移。分区领导者是否会连任?请带着这个问题,开启我们今天的探索之旅。首先,我们可以先看看当前有问题的分区的路由信息??。从第一张图可以看出主题dw_test_kafka_0816000的101分区消息发送失败。让我们检查它在Zookeeper中的状态。具体命令如下:./zkCli.sh-server127.0.0.1:2181get-s/kafka_cluster_01/brokers/topics/dw_test_kafka_0816000/partitions/101/state这个命令可以看到对应分区的信任信息,如图下图:这里显示leader的状态为-1,而isrlist在broker-1中只有一份,但是此时brokerid为1的机器已经下线了,为什么不呢触发分区领导者的重新选举?其实看到这里,相信只要稍加思索,就能找到端倪。isr字段的值为1,表示该partition的副本数为1,表示该partition只在一个Broker上存储数据。一旦Broker下线,因为集群中其他Broker上没有这个分区的数据,此时不能进行故障转移,因为一旦进行故障转移,该分区的数据就会丢失,影响非常大严肃的。那为什么这个topic的replicacount设置为1呢?那是因为当时集群压力太大,节点间复制的数据量巨大,网卡基本满负荷运行,而且是日志集群,更能接受数据丢失。对于海量复制,将topic的replicacount设置为1。但是,集群节点的宕机和维护是必不可少的。不能说每次宕机都会出现一段时间的数据写入失败。为了解决这个问题,在关闭之前,我们需要移动主题的分区,将主题的分区从需要关闭的集群中移除。具体迁移topic分区的方法可以参考我之前的文章KafkaTopic迁移实践的第三部分。3、Kafka节点离线分区故障转移机制分区Kafka单副本主题在集群中某个节点离线后将无法完成分区故障转移机制。kafka节点下线的一些failover机制。提醒:接下来就是从源码的角度去探究实现原理,加深对这个过程的理解。不感兴趣的可以直接进入本文的第四部分:总结。Kafka依赖的Zookeeper服务器保存了当前集群存活的broker信息。具体路径为/{namespace}/brokers/brokers/ids。具体图如下:ids下的每个节点记录了Broker的一些信息,比如对外提供服务的协议、端口等,值得注意的是这些节点都是临时节点,如下图:一旦对应的Broker下线,对应的节点会被删除,而Kafka集群中的Controller角色在启动时会监听该节点下节点的变化并做出响应,最后调用KafkaController的onBrokerFailure方法。具体代码如下:该方法实现起来比较复杂。我们这里不做过多分散,重点寻找分区故障转移机制,也就是接下来我们具体分析KafkaController的onReplicasBecomeOffline方法,主要是探索分区故障转移机制。3.1onReplicasBecomeOfflinefailover由于这个方法实现起来比较复杂,接下来会详细说明。Step1:对需要设置为离线的分区进行分组。分组依据是是否需要删除。不触发删除的集合用newofflineReplicasNotForDeletion表示,需要删除的集合用newofflineReplicasForDeletion表示。Step2:选择无leader分区,使用partitionsWithoutLeader,代码如下图:无leader分区的标准是:分区的leader副本所在的broker没有下线,没有被下线删除。Step3:将没有Leader的分区状态改为OfflinePartition(离线状态)。这里的状态更新放在KafkaController中的内存中。具体的内存结构:Map[TopicPartition,PartitionState]。Step4:Kafka分区状态机驱动(触发)分区状态从OfflinePartition和NewPartition到OnlinePartition。状态转换主要包括两个重要步骤:调用PartitionStateMachine的doHandleStateChanges方法驱动分区状态机的转换。然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法实现其他Broker上元信息的同步。限于篇幅,本文不会系统地介绍Kafka分区状态机的实现细节,而是首先重点介绍OfflinePartition的离线状态转换为OnlinePartition的过程。先解释一下OfflinePartition转OnlinePartition过程中各个参数的含义:Seq[TopicPartition]分区当前处于OfflinePartition和NewPartition状态,没有被删除的分区。PartitionStatetargetState状态驱动的目标状态:OnlinePartition。PartitionLeaderElectionStrategy分区leader选举策略,这里是OfflinePartitionLeaderElectionStrategy,分区离线状态的leader选举策略这里判断分区是否有效的依据主要是状态机设置的驱动条件,例如,仅分区状态为OnlinePartition、NewPartition、OfflinePartition状态转换为OnlinePartition。接下来重点说一下改成OnlinePartition的具体实现逻辑。具体代码如下:具体实现分为3步:首先,分别选择当前状态为NewPartition的集合和(OfflinePartition或OnlinePartition)分区。对于state为NewPartition的分区,进行分区初始化,一般用于分区扩容或新建topic。分区重选是在state为OfflinePartition或OnlinePartition时进行的,因为这些collection中的分区都是当前没有leader的分区,这些分区暂时不能接受读写。问。下面重点介绍下线状态变为OnlinePartition时分区领导者选举的实现。具体方法是:PartitionStateMachine的selectLeaderForPartitions方法。代码如下:该方法的实现结构比较简单,返回值为两组,其中一组选举成功。选举失败的集合,如果在选举过程中出现可恢复的异常,则进行重试。具体的重试逻辑是通过doElectLeaderForPartitions方法实现的,非常复杂。3.2分区选举机制分区选举是通过PartitionStateMachine的doElectLeaderForPartitions方法实现的,下面会逐步讲解。Step1:首先从Zookeeper中获取需要选举分区的元信息。代码如下:Kafka中topic的路由信息??存储在Zookeeper中,具体路径为:/{namespace}/brokers/topics/{topicName}}/partitions/{partition}/state,具体存储内容如下:Step2:将查询到的topic分区元数据组装成Map的Map结构,代码如下:Step3:将分区中的controllerEpoch与当前KafkaController的epochs进行比较,选择invalid和有效集。具体代码如下:如果当前controller的controllerEpoch小于partition状态下的controllerEpoch,说明有新的Broker取代了当前的Controller成为集群的新Controller。这次不能选leader,会打印log。Step4:根据Leader选举策略进行Leader选举。代码如下:由于我们这次是从OfflinePartition状态转换到OnlinePartition状态,所以我们进入的分支是leaderForOffline。我们稍后会详细介绍这个方法。选举后的返回值为Twosets,其中partitionsWithoutLeaders代表选举Leader失败的分区,partitionsWithLeaders代表成功选举Leader的分区。Step5:选举Leader失败的partition打印相应的日志,加入到失败队列集合中,如下图:Step5:更新选举结果到zookeeper,如下图:Step6:同步将最新的分区选举结果发送给其他Broker节点。LEADER_AND_ISR更新分区状态的请求被其他broker接受后,根据分区的leader和replica信息成为分区的leader节点或slave节点。这块的实现细节会在专栏后续文章中具体提及。OfflinePartitionLeaderElectionStrategy选举策略具体是如何进行选举的呢?接下来我们探讨其实现细节。3.3OfflinePartitionLeaderElectionStrategy选举策略OfflinePartitionLeaderElectionStrategy的选举策略实现代码,参见PartitionStateMachine的leaderForOffline,我们还是一步步来。Step1:主要初始化几个集合。代码如下,对以上变量做一个简单的介绍:partitionsWithNoLiveInSyncReplicaspartitions的replicas所在的broker都没有存活,也可以参与Leader选举。可以在topic级别设置unclean.leader.election.enable,默认为false。Step2:执行partitionleader选举。具体实现代码如下:首先解释一下以下变量的含义:assignmentpartition设置的replicaset(brokerId所在的位置)。liveReplicas当前在线的副本集。具体选举算法如下:offlinetoonline选举算法比较简单:如果unclean.leader.election.enable=false,则从survivingISR集中选择第一个leader成为partition,如果没有survivingISR副本,andunclean.leader.election.enable=true,则选择在线副本,否则返回NONE,说明没有成功选出合适的Leader。然后返回本次选举的结果,完成本次选举。4.小结本文从一个实际生产故障的分析入手。经过分析,当单副本主题下线时,集群中的单个节点将无法写入某些队列。停止的代理上的分区被移动到其他代理。这个过程不会影响消息的发送和消息的消费。