1.发起心跳请求。当Consumer客户端启动时,会构建并启动一个心跳监听线程HeartbeatThread。心跳监听线程名称:kafka-coordinator-heartbeat-thread|group.id例如:kafka-coordinator-heartbeat-thread|consumer0/***Java学习资料*wx:javataozi888**/privatebooleanenabled=false;privatesynchronizedvoidstartHeartbeatThreadIfNeeded(){if(heartbeatThread==null){heartbeatThread=newHeartbeatThread();heartbeatThread.start();}}虽然此时启动了,但是在run方法中有一个逻辑标志为enabled=false,实际上此时不会发出心跳监听。它会随着整个消费群的状态变化而变化。1.1启动心跳线程比如我们的consumerclient发起JoinGroupRequest,回调成功后,会设置enabled=trueJoinGroupResponseHandler#handle从下面的代码可以看出,JoinGroupRequest回调时,调用client的状态至COMPLETING_REBALANCE,并开始监听线程1.2挂起心跳线程当客户端状态变为UNJOINED或PREPARING_REBALANCE或心跳线程异常时,心跳线程会暂时停止,因为UNJOINED或PREPARING_REBALANCE状态本身不需要定期检查协调器离线,不用管。1.3发起心跳请求。有如下相关配置。默认值为heartbeat.interval.ms消费者协调器与消费者协调器之间的心跳间隔。当消费者加入或离开集团时促进再平衡。这个值必须设置得低于session.timeout.ms,但通常应该设置为不高于这个值的1/3,也可以设置为低于3000(3秒)synchronizedRequestFuturesendHeartbeatRequest(){log.debug("正在向协调器{}发送带有世代{}和成员ID{}的心跳请求",generation.generationId,generation.memberId,coordinator);HeartbeatRequest.BuilderrequestBuilder=newHeartbeatRequestreturnclient.send(coordinator,requestBuilder).compose(newHeartbeatResponseHandler(generation)));}带上客户端的基本信息发起请求1.4发起LeaveGroup(离开组)请求当客户端检测到当前时间超过session。消费者协调器被标记为空,需要重新寻找协调器当客户端检测到当前时间最后一条客户端轮询消息超过max.poll.interval.ms默认值300000(5分钟)时,会执行LeaveGroup请求AbstractCoordinator#maybeLeaveGrouppublicsynchronizedRequestFuturemaybeLeaveGroup(StringleaveReason){RequestFuturefuture=null;//从2.3开始,只有动态成员才会向broker发送LeaveGroupRequest,拥有有效group.instance.id的消费者被视为静态成员,永远不会发送LeaveGroup,成员过期仅受会话超时控制if(isDynamicMember()&&!coordinatorUnknown()&&state!=MemberState.UNJOINED&&generation.hasMemberId()){//这是离开组的最小尝试。如果请求失败或超时,我们不会//??尝试任何重新发送。log.info("Member{}发送LeaveGroup请求给Coordinator{}由于{}",generation.memberId,coordinator,leaveReason);LeaveGroupRequest.Builderrequest=newLeaveGroupRequest.Builder(重新平衡eConfig.groupId,Collections.singletonList(newMemberIdentity().setMemberId(generation.memberId)));future=client.send(coordinator,request).compose(newLeaveGroupResponseHandler(generation));client.pollNoWakeup();}//重置状态为UNJOINEDresetGenerationOnLeaveGroup();返回未来;}2.3之后,只有动态成员才会向broker发送LeaveGroupRequest,拥有有效group.instance.id的消费者被视为静态成员,永远不会发送LeaveGroup,成员过期仅在session.timeout.ms的控制下,将客户端状态重置为未加入。具体我就不分析了。client向GroupCoordinator发送LeaveGroupRequest后,coordinator做的是3.移除Member并尝试重新平衡2.GroupCoordinator处理请求。下面的代码看起来很多,其实也不是很复杂。它基本上是一些验证逻辑。GroupCoordinator#handleHeartbeatdefhandleHeartbeat(groupId:String,memberId:String,groupInstanceId:Option[String],generationId:Int,responseCallback:Errors=>Unit):Unit={validateGroupStatus(groupId,ApiKeys.HEARTBEAT).foreach{错误=>if(error==Errors.COORDINATOR_LOAD_IN_PROGRESS)//该组仍在加载,所以只是盲目响应UNKNOWN_MEMBER_ID)caseSome(group)=>group.inLock{if(group.is(Dead)){//如果该组被标记为已死,则意味着其他线程刚刚从协调器元数据中删除了该组;//这很可能是该组已迁移到其他//坐标或者该组处于瞬态不稳定阶段。让成员重试//找到正确的协调器并重新加入。responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)}elseif(group.isStaticMemberFenced(memberId,groupInstanceId,"heartbeat")){responseCallback(Errors.FENCED_INSTANCE_ID)}elseif(!group.has(memberId)){responseCallback(Errors.UNKNOWN_MEMBER_ID)}elseif(generationId!=group.generationId){responseCallback(Errors.ILLEGAL_GENERATION)}else{group.currentStatematch{caseEmpty=>responseCallback(Errors.UNKNOWN_MEMBER_ID)caseCompletingRebalance=>//消费者可以在加入组后开始发送心跳响应,在这种情况下//我们应该将它们视为正常的hb请求并重置计时器valmember=group.get(memberId)completeAndScheduleNextHeartbeatExpiration(group,member)responseCallback(Errors.NONE)casePreparingRebalance=>valmember=group.get(memberId)completeAndScheduleNextHeartbeatExpiration(group,member)响应回调(Errors.REBALANCE_IN_PROGRESS)caseStable=>romvalem)completeAndScheduleNextHeartbeatExpiration(group,member)responseCallback(Errors.NONE)caseDead=>thrownewIllegalStateException(s"ReachedunexpectedconditionforDeadgroup$groupId")}}}}}简单来说就是检查Groupcoordinator是否存在当前Member检查状态是否该集团已死。如果是Dead,client需要寻找新的GroupCoordinator和JoinGroup,判断client和GroupCoordinator是否同龄。如果不是同一年龄段,客户端需要重新加入群组。如果status是PreparingRebalance,client会判断,如果是STABLE,会重新JoinGroup。如果GroupCoordinator的当前状态为CompletingRebalance,Stable,则会清理GroupCoordinator设置的延迟过期任务,并重新设置一个新的任务。该任务的执行时间是在配置session.timeout.ms之后。如果假设没有心跳线程请求,那么这个任务就会被执行。如果实施会有什么问题?请继续阅读下面关于JoinGroupRequest的消费者组协调器超时任务,这是来自客户端的加入消费者组的请求。详见:KafkaconsumerJoinGroupRequest流程分析2.1Consumergroupcoordinator超时任务如果在session.timeout.ms期间没有收到客户端的信跳转请求,则执行consumergroupcoordinator超时任务defonExpireHeartbeat(group:GroupMetadata,memberId:String,isPending:Boolean):Unit={group.inLock{if(group.is(Dead)){//如果当前心跳检测到GroupCoordinatorDead,就打印log,因为它可能不是组协调员本身,他不再被允许做任何事情isPending){info(s"Pendingmember$memberIdingroup${group.groupId}hasbeenremovedaftersessiontimeoutexpired.")//当客户端发起第一个JoinGroup请求时,不包括memberId,但是Group会生成一个返回给客户端//此时成员处于Pending状态,属于等待加入状态,因为它还会用这个memberId发起第二次JoinGroup请求,才算真正加入Group//这里直接从Pending缓存中移除这个memberId,因为它的心跳监听已经过期,也就是说客户端需要重新发起第一次JoinremovePendingMemberAndUpdateGroup(group,memberId)}elseif(!group.has(memberId)){黛布ug(s"成员$memberId已从组中删除。")}else{valmember=group.get(memberId)if(!member.hasSatisfiedHeartbeat){info(s"成员${member.memberId}在组中${group.groupId}失败,将其从组中删除")removeMemberAndUpdateGroup(group,member,s"removingmember${member.memberId}onheartbeatexpiration")}}}}如果组状态为已死,则什么都没有不,它已经不是组协调器了,它什么也做不了。如果当前成员处于Pending状态,(先了解Pending状态,因为成员第一次加入Group时没有带上memberId参数,所以groupcoordinator会生成一个MemberId返回给client,而groupcoordinator会在自己身上保存一份Member的数据,但是此时Member处于Pending状态,意识等待加入,因为它会发起第二次JoinGroup请求,而这个MemberId才是真正的JoinGroup在这次。)然后从Pending缓存中删除该成员。表示该Member需要再次发起第一次JoinGroup请求。在其他状态下,如果在判定期间没有心跳请求,则移除Member,更新Group元信息。①.从Groupcoordinator的缓存中移除Member②。如果当前状态是Stable|CompletingRebalance,直接进入prepareRebalance流程。prepareRebalance过程主要是把状态流转到PreparingRebalance,设置一个DelayedJoin超时过期任务。是max.poll.interval.ms默认300000(5分钟)。这个任务会在满足要求时执行onCompleteJoin(所有Member都是JoinGroup)。这与JoinGroup背后的过程相同。主要动作是通知所有Member你已经成功加入,然后你应该发起一个SyncGroup请求。详见:KafkaconsumerJoinGroupRequest流程分析3.客户端处理返回数据HeartbeatResponseHandler#handle这段代码就不贴出来了,主要是根据返回的异常做具体的事情。如果没有异常,什么也不做。异常映射关系如下:COORDINATOR_NOT_AVAILABLE|NOT_COORDINATOR是时候寻找一个新的GroupCoordinatorREBALANCE_IN_PROGRESS;当前groupcoordinator处于Rebalance状态,如果当前client处于STABLE状态,说明它要重新发起JoinGroupRequest,它要加入并参与分配就结束了。非法一代|UNKNOWN_MEMBER_ID|FENCED_INSTANCE_ID:设置Member状态为UNJOINED,重新加入Group4。心跳线程状态图我们可以先看一下消费者客户端Member的状态流程图。理解了这个状态流图,也就可以知道心跳线程状态流图了,因为心跳线程只能运行在两种状态:COMPLETING_REBALANCE,STABLE