什么是协调器?协调者是用来协调多个消费者正确工作的角色,比如计算消费的分区分配策略,或者消费者入组和离组的处理逻辑,有点类似于Kafka控制器的角色。协调员的角色。协调器分为两种:消费者组协调器和消费者协调器。组协调器(GroupCoordinator)可以理解为每个消费者协调器的中央处理器,每个消费者的所有交互都是通过组协调器(GroupCoordinator)完成的。选举Leaderconsumerclient处理client申请加入groupRebalancing和同步新的分配方案维护和client心跳检测管理Consumer已经消费了offsets并存储在__consumer_offsetConsumercoordinator每个client都会有一个consumercoordinator,其主要作用是发起arequesttogroupcoordinator进行交互,处理回调逻辑向groupcoordinator发起grouprequest向groupcoordinator发起同步grouprequest(如果是Leaderclient,那么也会计算分配策略数据和放入入参)发起离开请求与groupcoordinator的心跳线程保持联系向groupcoordinator发送请求提交消耗的offsetkafka上有很多GroupCoordinator协调器。与Controller不同的是只有一个Controller,GroupCoordinator(组协调器)是根据内部Topic的数量__consumer_offset来决定的。有多少个__consumer_offset分区?那么有多少组协调器(GroupCoordinator)。但是每个partition可能有多个replicas,那么每个groupcoordinator应该分配到哪里呢?每个__consumer_offset分区的Leader副本在哪个Broer上,那么对应的coordinator在哪。详见:查找CoordinatorFindCoordinatorRequest请求流程如何确定每个ConsumerGroup对应哪个Coordinator默认情况下,__consumer_offset有50个partition,每个consumergroup会对应其中一个partition。)%分区数;当消费者加入群组过程中,JoinGroup客户端在启动或重连时会发起JoinGroup请求申请加入群组。JoinGroup时序图消费者客户端发起第一个请求,协调器为其计算一个MemberId返回给消费者客户端发起第二个请求。MemberId是刚刚获取的。消费者组协调器处理请求并在组中构建新的MemberMetadata元信息缓存。consumergroupcoordinator将状态流转为PreparingRebalance来初始化Generation数据,比如generationId+1,Group状态流转为CompletingRebalance。当然,如果当前Group成员为空,则流程改为Empty;上面的数据组装成JoinGroupResult返回给AllMembers,当然如果是LeaderMembers,还会给他附加所有Member的meta信息(因为需要用到这些数据来计算分配给新partition的数据。)consumerclient拿到数据后,就像是consumergroup的coordinator发起SyncGroupRequest请求。如果是LeaderMember,会根据分区策略计算出新的分配策略,将数据带到SyncGroupRequest请求中。关于SyncGroupRequest,请看:KafkaConsumerSyncGroupRequest流程分析详细请看:KafkaConsumerJoinGroupRequest流程分析GroupCoordinator同步流程SyncGroup当前client完成JoinGroup后,会收到JoinGroup回调,然后client会再次向组协调器发起SyncGroup请求,获取新的分配方案。但在这个过程中,新的分配方案是由Leader客户端计算出来的,会同步给groupcoordinator。然后组协调员将这些结果回调给许多客户。消费者离群流程LeaveGroup当消费者客户端关闭/异常时,会触发LeaveGroup请求。组协调器将始终对每个客户端进行心跳检测。如果检测失败,客户端将被踢出群组。提议的过程也很容易触发再平衡。心跳检测客户端加入群组后,会一直维护一个心跳线程,与群组协调器保持感知。并且组协调器会为每个加入组的客户端做一个心跳监控。如果检测过期,则将其踢出组并重新平衡。分区分配策略Q&A1.如果多个client配置了不同的分配策略,哪个配置会生效?ConsumerGroup下的所有成员使用相同的分配策略进行分配是肯定有必要的。所以GroupCoordinator面临着选择哪种分配策略。选择逻辑如下:在1的基础上选择所有Member支持的分配策略,优先选择每个partition.assignment.strategy配置最靠前的策略。请看下面两个例子。Case-1的所有支持的分配策略是:roundrobin,rang。每个消费者根据1为自己投票。Consumer-0投票给roundrobin,consumer-1投票给rang。Consumer-3投票给roundrobin;这样roundrobin就有2票,所以长期选择roundrobin作为分配策略;Case-2支持的所有分配策略是:rang不需要投票,如果有新成员加入组直接选择rang被选举,如果携带的分配策略与组内所有Member支持的协议不重叠现有组(如果组有成员),将抛出异常:INCONSISTENT_GROUP_PROTOCOL2022-09-0814:34:12,508]INFO[ConsumerclientId=client2,groupId=consumer0]Rebalancefailed。(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)org.apache.kafka.common.errors.InconsistentGroupProtocolException:组成员支持的协议与现有成员的协议不兼容,或者第一个组成员试图加入空协议类型oremptyprotocollist.[2022-09-0814:34:12,511]ERRORErrorprocessingmessage,terminatingconsumerprocess:(kafka.tools.ConsoleConsumer$)2.消费者消费并提交后,如果其他消费者知道我已经消费了,就不会再消费了?一个分区只能被同一个消费组中的消费者消费,消费后消费者会将消费偏移量提交给组协调器存储。存储的地方是Kafka内部的Topic__consumer_offset,存储的数据结构如下:Key:Value:可以看到Key的结构是group+topic+partition。当rebalancing发生时,即使partition被分配给了其他消费者If,它也会使用这个key来寻找当前消费的offset。简单来说就是:同一个消费者组下,一个分区只会被一个消费者消费。消息被消费后,会存储在Topic内部__consumer_offset中,当过期策略紧凑(压缩)存储offset时,key结构为group+topic+partition,所以即使重平衡后,不同读取的offset同一组的消费者是在上一个消费者提交之后。3、consumer消费的offset存储结构是什么样子的?存储结构请看:Kafka消费者组offset数据结构图_consumer_offset4.知道了offset存储结构,如果让你重新设置offset,你觉得应该怎么办?由于消费者消费的offset保存在Topic内部__consumer_offset中,因此消费者在消费时首先读取这个Topic的最新值,key的结构为group+topic+partition;如果我们要修改偏移量,只要改变这个值的数据大小即可。并且因为它的过期策略是紧凑的,我们只需要为Topic的指定key(group+topic+partition)发送一个新的offset值即可__consumer_offset。如果您想重置偏移量,只需向该主题发送墓碑消息以告知它。5.如果__consumer_offset扩大,offset记录会不会丢失?
