1.前言我们在上一篇文章中分析了Consumer是如何加入ConsumerGroup的。其实上一篇文章是一个很宏观的东西,主要讲的是ConsumerCoordinator和GroupCoordinator是如何通信的。等等,老周,什么是ConsumerCoordinator和GroupCoordinator?这两个组件分别是Consumer和KafkaBroker的协调器。说白了,就是我们设计模式中的门面模式。详情请参考之前的评论。今天的文章主要讲的是上一篇文章中消费者如何加入ConsumerGroup中的Rebalance机制。其实上一篇已经给了大概的概述。本文将更深入的谈谈Rebalance机制的具体细节。如果你是有经验的程序员,我觉得Rebalance机制可以作为一道面试题,难度还是有的。不过也没必要小看自己,跟着老周这篇文章,相信你一定能赢。但也有读者认为还是有难度。别着急,先看看下面Kafka的拓扑结构。这个结构非常清晰。如果你还不知道Kafka的拓扑结构,建议你不要往下看,先搞清楚Kafka的拓扑结构,或者先看一下老周以前的文章再继续看,我觉得效果会更好。本文主要从以下几点讲Rebalance机制:什么是Rebalance机制?触发Rebalance机制的时机组状态变化老消费者客户端的问题Rebalance机制的原理Broker端rebalance场景2.什么是Rebalance机制?Rebalance本质上是一种协议,规定了一个消费者组下的所有消费者如何达成共识,分配给订阅某个主题的各个分区。当新成员加入集群,或者一些topic增加partition时,consumer如何重新分配消费?这就涉及到rebalance的概念。下面给大家解释一下什么是Kafkarebalance平衡机制。从图中我们可以发现consumergroup模型的几个概念:在同一个consumergroup中,一个partition只能被一个consumer订阅消费,但是一个consumer可以订阅多个partition,也就是说每条消息都会只被同一个消费者订阅一个消费组的某个消费者消费,保证不会被重复消费;一个分区可以被不同的消费组订阅。这里有个特例。如果每个消费组只有一个消费者,则将分区广播给所有消费者上面,实现广播方式消费。实现上述消费组模型,需要在外部环境发生变化时,如topic增加新分区,新成员加入消费组等,实现动态调整以维持上述模型,则本工作将交给Kafka再平衡(Rebalance)机制来处理。从图中可以看出,Kafka的rebalancing是由外部触发引起的。让我们看看什么时候触发Kafka再平衡。3、触发Rebalance机制的时机有新的Consumer加入ConsumerGroup,也有部分Consumer宕机下线。消费者不一定需要离线。例如,当consumer由于长时间的GC或者网络延迟,长时间没有向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为Consumer下线。Consumer自愿退出ConsumerGroup(发送LeaveGroupRequest请求)。例如客户端调用unsubscribe()方法取消对某些主题的订阅。Consumer消费超时,未在规定时间内提交offset偏移量。ConsumerGroup对应的GroupCoordinator节点发生了变化。消费者组订阅的任何主题或主题分区的数量发生变化。4.组状态变化4.1ConsumerCoordinator,Consumer端的consumer端门面,继承了AbstractCoordinator抽象类。在协调器AbstractCoordinator的内部类MemberState中,我们可以看到协调器的四种状态,分别是未注册、重新分配后未收到响应、重新分配后收到响应但尚未分配、稳定状态。消费者上述四种状态的转换如下图所示:4.2服务器对Kafka服务器的GroupCoordinator有五种状态:Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。它们的状态转换如下图所示:一个消费者组一开始是Empty。启用rebalancing后,会放在PreparingRebalance中等待成员加入。然后改为CompletingRebalance等待分配方案,最后流向Stable完成Rebalance。当成员发生变化时,消费者组状态从Stable变为PreparingRebalance。此时,所有现有成员都需要重新申请加入群组。当所有群组成员离开群组时,消费群组状态为Empty。消费者组处于Empty状态,Kafka会定期自动删除过期的offsets。5.旧版本消费者客户端的问题ConsumerCoordinator和GroupCoordinator的概念是针对Kafka0.9.0版本之后的消费者客户端。我们暂且把Kafka0.9.0之前的消费者客户端称为老版本的消费者客户端。遗留消费者客户端使用Zookeeper的监听器(Watcher)来实现这些功能。每个消费者组在Zookeeper中维护一个/consumers//ids路径。在该路径下,临时节点用于记录属于该消费者组的消费者的唯一标识consumerldString。consumerldString在消费者启动时创建。消费者的唯一标识由consumer.id+hostname+时间戳+UUID部分信息组成,其中consumer.id是老版本消费者客户端中的配置,相当于新版本中的client.id客户端的新版本。例如消费者的唯一标识为consumerld_localhost-1510734527562-64b377f5,其中consumerld为指定的consumer.id,localhost为计算机的主机名,1510734527562代表时间戳,64b377f5代表部分UUID信息。下图中,与/consumers//ids同级的有两个节点:owners和offsets。/consumers//owners路径记录了分区与消费者的对应关系。/consumers//offsets路径记录分区中的消费组。每个broker、topic、partition在Zookeeper中对应一个路径:/brokers/ids/记录分配给这个broker的hosts、ports、topic分区列表;/brokers/topics/记录了各个分区的leadercopy、ISRset等信息。/brokers/topics//partitions//state记录了当前leadercopy、leaderepoch等信息。每个消费者在启动时在/consumers//ids和/brokers/ids路径上注册一个监听器。当/consumers//ids路径下的子节点发生变化时,表示消费者组中的消费者发生了变化;当/brokers/ids路径下的子节点发生变化时,表示broker增加或减少了。这样,通过Zookeeper提供的Watcher,每个消费者都可以监控消费组和Kafka集群的状态。这样每个消费者分别监听Zookeeper的相关路径。当一个rebalancing操作被触发时,一个consumergroup下的所有consumer都会同时执行rebalancing操作,consumer之间不知道彼此的操作。因此,这会导致Kafka在不正确的状态下工作。同时,这种严重依赖Zookeeper集群的做法有两个严重的问题。羊群效应:所谓羊群效应是指Zookeeper中某个被监控的节点发生变化,大量的Watcher通知发送给客户端,导致通知期间其他操作的延迟,也可能发生类似死锁的事件发生条件。SplitBrain:当消费者进行再平衡操作时,每个消费者都与Zookeeper通信,以确定消费者或代理的变化。由于Zookeeper本身的特性,可能会导致每个消费者同时获取数据。状态不一致,就会导致出现异常问题。6.Rebalance机制的原理Kafka0.9.0版本后的consumerclient被重新设计,将所有的consumergroup划分为多个子集,每个consumergroup子集在server端对应一个GroupCoordinator来管理它们,GroupCoordinator是一个用于管理Kafka服务器中的消费者组的组件。消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。Rebalance的完整过程需要Consumer&Coordinator共同完成Consumer端的Rebalance步骤。加入组:对应JoinGroup请求等待LeaderConsumer分配方案:对应SyncGroup请求当组内成员加入组时,Consumer向协调器发送JoinGroup请求。每个Consumer都会上报它订阅的主题Coordinator。在收集到所有JoinGroup请求后,这些成员中的一个被选为消费者组的Leader。通常最先发送JoinGroup请求的自动成为Leader。LeaderConsumer的任务是收集所有成员的话题,根据这些信息制定具体的分区消费者分配方案。leader选出后,coordinator将所有topic信息封装成JoinGroupResponse发送给leader。LeaderConsumer统一分配方案,进入SyncGroup请求。LeaderConsumer将SyncGroup发送给协调器,并将分配计划发送给协调器。其他成员也会发送SyncGroup请求。协调器将计划以同步组响应的形式发送给所有成员。所有成员成功收到分配方案,消费组进入Stable状态,开始正常消费。具体源码分析可以看我上一篇消费者如何加入消费者组的文章。7.Broker端再平衡场景7.1新成员加入消费者组,新成员在稳定后加入7.2组成员主动离开:ConsumerInstance通过调用close()方法通知协调者退出。本场景涉及到第三个请求:LeaveGroupRequest7.3Groupmemberscrashandleavecoordinator需要等待一段时间才能感知,这个时间段由Consumer端的参数sessionn.timeout.ms控制。Kafka不会超过上述参数。时间感知的崩溃处理过程是相同的。7.4Rebalance时,群成员提交offset。协调员会给成员一个缓冲期,要求每个成员在这段时间内迅速上报自己的抵消额。然后开始正常的JoinGroup/SyncGroup请求。这就是Rebalance机制的全部内容。下一篇文章会讲到如何避免再平衡。本文转载自微信公众号“老周聊结构”,可通过以下二维码关注。转载本文请联系老周聊框架公众号。
