当前位置: 首页 > 科技观察

Kafka分区工作机制长文分析

时间:2023-03-12 19:41:30 科技观察

Kafka的消息发送和消息消费都与分区密切相关。让我们从这篇文章开始,学习一些关于分区的知识。本文将重点介绍分区的内部工作机制,即分区状态机运行机制。1.Kafka分区状态Kafka内部分区的运行机制实现为PartitionStateMachine。从这个类的注释中我们可以知道Kafka分区有四种状态。它们是:NonExistentPartition表示该分区不存在,通常该分区从未被创建或创建后被删除。NewPartition分区已经创建,也就是已经分配了copy,但是还没有进行分区leader选举,也就是leader分区和ISR集还不存在,之前的有效状态是NonExistentPartition。OnlinePartition分区在线时的状态,表示分区选举已经完成,leader选举成功。这个时候就可以发送和消费消息了。之前的有效状态是NewPartition/OfflinePartition。当OfflinePartition分区离线时,意味着选出的Leader失效。比如Leader所在的Broker宕机了,之前的有效状态是NewPartition/OnlinePartition。分区的状态变化如下:2.Kafka分区状态机本文以下行为思路将阅读源码,深入PartitionStateMachine的实现细节,提炼分区变化实现的要点,帮助我们运作得更好。维卡夫卡。2.1状态机启动过程状态机的启动过程定义在PartitionStateMachine的启动方法中。该方法的调用时机:当新的Broker通过controller选举成为新的Controller时调用。该方法的声明如下:状态机的启动主要包括两步:初始化分区的状态,触发分区状态向OnlinePartition的转移下面详细讨论实现细节。2.1.1分区状态初始化首先我们看一下分区的初始化过程。具体代码如下:该方法的实现要点:KafkaController中使用ControllerContext来存储内存中controller相关的数据结构,其中Map[String,mutable.Map[Int,Seq[Int]]]partitionReplicaAssignmentUnderlying存储当前集群的所有分区信息(主题名、分区号、副本数)。由于是重选controller,所以需要重新初始化所有partition。然后根据Map[TopicPartition,LeaderIsrAndControllerEpoch]partitionLeadershipInfo存储每个partition当前的runtimestate,分为三种情况:如果topicpartition的Leaer和ISR信息在partitionLeadershipInfo中不存在,driverstate从NonExistentPartition变为NewPartition。如果partitionLeadershipInfo中存在topic分区的leader信息,但是对应的Broker处于离线状态,则driver状态由NonExistentPartition变为OfflinePartition。如果partitionLeadershipInfo中存在topic分区的leader信息,但是对应的Broker已经离线,则状态会先由NonExistentPartition转换为OfflinePartition。值得注意的是调用changeStateTo方法改变分区的状态只是更新内存中的状态。具体实现如图:具体实现是在Map[TopicPartition,PartitionState]中存放需要更新的状态。2.1.2分区状态运行机制根据内存中当前维护的LeaderAndISR信息将状态存入本地内存后,接下来就是将分区状态转换为Online状态。具体代码实现参见PartitionStateMachine的triggerOnlinePartitionStateChange方法。代码如下:该方法的要点是在内存缓存(Map[TopicPartition,PartitionState])中选择OfflinePartition和NewPartition中未被删除的分区,驱动状态机,调用handleStateChanges方法尝试转换到OnlinePartition分区。该方法主要做了以下两件事:调用PartitionStateMachine的doHandleStateChanges方法来驱动分区状态机的转换。然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法实现其他Broker上元信息的同步。为了更清楚全面的了解分区状态的变化,我也给出了所有在Kafka中调用handleStateChanges的调用入口。在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:由于篇幅所限,其他Broker中partition信息的状态同步将在介绍下一篇。PartitionStateMachine的doHandleStateChanges方法在上一篇已经详细介绍过了。不好意思,在Kafka生产实践中已经详细介绍过了。这里我总结提炼一下:目标状态为NewPartition、OfflinePartition、NonExistentPartition。没有复杂的实现逻辑,只是更新内存中的状态,在state-change.log文件中输出状态变化日志。只有当目标状态为OnlinePartition时,才会执行详细的处理逻辑。但是也许你会有一个疑问,当状态变为NewPartition时,什么时候会转换到OnlinePartition状态呢?实际上,调用doHandleStateChanges将目标方法设置为NewPartition后,会立即调用triggerOnlinePartitionStateChange等方法,进一步将状态转化为OnlinePartition状态。由于尴尬,Kafka生产实践中出现了问题。本文详细介绍了从OfflinePartition到OnlinePartition的转换过程。所以本文着重介绍从NewPartition状态到OnlinePartition的转换处理逻辑,其实就是分区。创建的流程,本块代码入口如下:由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,下面一步步讲解。2.1.3分区初始化过程下面详细讨论PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。Step1:首先获取所有分区对应的在线副本,用Seq>>liveReplicasPerPartition表示。类比Java的数据结构是List>,代码如下:在Kafka中创建主题时,kafka会先根据集群节点的负载情况生成静态负载情况,分区数,副本数,topic的物理rack,存放在/brokers/topics/{topicName}中,其中的数据如下图所示:liveReplicasPerPartition就是根据这个数据结构筛选出在线broker.例如id为4的broker不在线,liveReplicasPerPartition中的值可能如下:["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]Step2:如果一个partition的所有pre-allocatedshard都离线了,打印errorlog,代码如下:Step3:为partition创建leaderIsrAndControllerEpoch信息,代码如下:实现这里比较简单,值得注意的是分区的leader是初始化时ISR列表中的第一个分区。Step4:将分区leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch,ControllerEpoch)的状态信息写入zookeeper,具体代码如下;具体的,在zookeeper中创建/broker/topics/{topicName}/partitions/{partitionnumber}/state,并将leaderIsrAndControllerEpoch写入上述节点,具体效果如下图所示:Step5:处理zookeeper写入结果,对应代码如下:如果在zookeeper中创建成功,则将leaderIsrAndControllerEpoch信息缓存到内存中(Map),并将该信息放入controllerBrokerRequestBatch,KafkaBroker控件将信息同步到集群中的其他Broker,并且会在state-change.log日志文件中记录状态变化日志;如果创建失败,则在state-change.log中输出相应的错误日志。当然:为了尽量保证上述流程的创建成功,Zookeeper的写流程引入了重试机制,保证最终执行成功,除非出现AUTH_FAILED等不可恢复的异常。分区信息写入zookeeper的/broker/topics/{topicName}/partitions/{partitionnumber}/state文件路径后,会再次调用changeTo方法将分区的状态更改为内存中的OnlineParttion。什么时候会触发分区相关文件夹的实际创建?原来分区信息写入zookeeper指定文件后,由于KafkaController订阅了/broker/topics/{topicName}相关节点,节点的创建会实时通知KafkaController进行分区选择。具体代码如下所示:通过Zookeeper的事件监听机制,Kafka巧妙地实现了分区状态机的切换。3.小结通过以上的学习,我们应该对分区有了更深入的了解。从这里我们至少可以得出以下结论:分区的状态主要包括四种状态:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。只有分区状态是OnlinePartition。提供对外读写。Kafka启动时,选举出集群控制器(KafkaController)后,会启动分区状态机(PartitionStateMachine),Kafka会根据/brokers/topics/{topicName}/partitions/{partition_no中的信息驱动分区}/state状态转换到OnlineParttion。当一个topic新建时,Kafka会根据当前集群负载、分区数、副本数、待创建的rack信息进行负载均衡,生成分区意向leader,以及分区副本的分布情况,写入到On/brokers/topics/{topicName}节点,此时会触发PartitionModifications,从而触发分区创建过程,即从NewPartition到OnlineParttion的转换。