Kafka的消息发送和消息消费都与分区密切相关。让我们从这篇文章开始,学习一些关于分区的知识。本文将重点介绍分区的内部工作机制,即分区状态机运行机制。1.Kafka分区状态Kafka内部分区的运行机制实现为PartitionStateMachine。从这个类的注释中我们可以知道Kafka分区有四种状态。它们是:NonExistentPartition表示该分区不存在,通常该分区从未被创建或创建后被删除。NewPartition分区已经创建,也就是已经分配了copy,但是还没有进行分区leader选举,也就是leader分区和ISR集还不存在,之前的有效状态是NonExistentPartition。OnlinePartition分区在线时的状态,表示分区选举已经完成,leader选举成功。这个时候就可以发送和消费消息了。之前的有效状态是NewPartition/OfflinePartition。当OfflinePartition分区离线时,意味着选出的Leader失效。比如Leader所在的Broker宕机了,之前的有效状态是NewPartition/OnlinePartition。分区的状态变化如下:2.Kafka分区状态机本文以下行为思路将阅读源码,深入PartitionStateMachine的实现细节,提炼分区变化实现的要点,帮助我们运作得更好。维卡夫卡。2.1状态机启动过程状态机的启动过程定义在PartitionStateMachine的启动方法中。该方法的调用时机:当新的Broker通过controller选举成为新的Controller时调用。该方法的声明如下:状态机的启动主要包括两步:初始化分区的状态,触发分区状态向OnlinePartition的转移下面详细讨论实现细节。2.1.1分区状态初始化首先我们看一下分区的初始化过程。具体代码如下:该方法的实现要点:KafkaController中使用ControllerContext来存储内存中controller相关的数据结构,其中Map[String,mutable.Map[Int,Seq[Int]]]partitionReplicaAssignmentUnderlying存储当前集群的所有分区信息(主题名、分区号、副本数)。由于是重选controller,所以需要重新初始化所有partition。然后根据Map[TopicPartition,LeaderIsrAndControllerEpoch]partitionLeadershipInfo存储每个partition当前的runtimestate,分为三种情况:如果topicpartition的Leaer和ISR信息在partitionLeadershipInfo中不存在,driverstate从NonExistentPartition变为NewPartition。如果partitionLeadershipInfo中存在topic分区的leader信息,但是对应的Broker处于离线状态,则driver状态由NonExistentPartition变为OfflinePartition。如果partitionLeadershipInfo中存在topic分区的leader信息,但是对应的Broker已经离线,则状态会先由NonExistentPartition转换为OfflinePartition。值得注意的是调用changeStateTo方法改变分区的状态只是更新内存中的状态。具体实现如图:具体实现是在Map[TopicPartition,PartitionState]中存放需要更新的状态。2.1.2分区状态运行机制根据内存中当前维护的LeaderAndISR信息将状态存入本地内存后,接下来就是将分区状态转换为Online状态。具体代码实现参见PartitionStateMachine的triggerOnlinePartitionStateChange方法。代码如下:该方法的要点是在内存缓存(Map[TopicPartition,PartitionState])中选择OfflinePartition和NewPartition中未被删除的分区,驱动状态机,调用handleStateChanges方法尝试转换到OnlinePartition分区。该方法主要做了以下两件事:调用PartitionStateMachine的doHandleStateChanges方法来驱动分区状态机的转换。然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法实现其他Broker上元信息的同步。为了更清楚全面的了解分区状态的变化,我也给出了所有在Kafka中调用handleStateChanges的调用入口。在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:由于篇幅所限,其他Broker中partition信息的状态同步将在介绍下一篇。PartitionStateMachine的doHandleStateChanges方法在上一篇已经详细介绍过了。不好意思,在Kafka生产实践中已经详细介绍过了。这里我总结提炼一下:目标状态为NewPartition、OfflinePartition、NonExistentPartition。没有复杂的实现逻辑,只是更新内存中的状态,在state-change.log文件中输出状态变化日志。只有当目标状态为OnlinePartition时,才会执行详细的处理逻辑。但是也许你会有一个疑问,当状态变为NewPartition时,什么时候会转换到OnlinePartition状态呢?实际上,调用doHandleStateChanges将目标方法设置为NewPartition后,会立即调用triggerOnlinePartitionStateChange等方法,进一步将状态转化为OnlinePartition状态。由于尴尬,Kafka生产实践中出现了问题。本文详细介绍了从OfflinePartition到OnlinePartition的转换过程。所以本文着重介绍从NewPartition状态到OnlinePartition的转换处理逻辑,其实就是分区。创建的流程,本块代码入口如下:由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,下面一步步讲解。2.1.3分区初始化过程下面详细讨论PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。Step1:首先获取所有分区对应的在线副本,用Seq
