当前位置: 首页 > 科技观察

Kafka的消费和心跳

时间:2023-03-16 01:54:51 科技观察

攻略Kafka是一个分布式、分区、多副本、多订阅者的消息发布订阅系统(分布式MQ系统),可以用来查询日志、监控日志、访问日志等。Kafka是一个分布式、分区、多副本、多订阅者的消息发布订阅系统(分布式MQ系统),可以用来查询日志、监控日志、访问日志等,今天小编就带大家一起来了解一下Kafka消费和心跳机制。1.Kafka消费首先我们来看一下消费。Kafka提供了一个非常简单的消费API。用户只需要初始化Kafka的BrokerServer地址,然后实例化KafkaConsumer类即可获取Topic中的数据。一个简单的kafka消费示例代码如下:();props.put("bootstrap.servers","dn1:9092,dn2:9092,dn3:9092");//指定Kafka集群地址props.put("group.id","ke");//指定消费组props.put("enable.auto.commit","true");//开启自动提交props.put("auto.commit.interval.ms","1000");//自动提交timeinterval//反序列化消息主键props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//反序列化消费记录props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");returnprops;}/**实现单线程消费者*/@Overridepublicvoidrun(){//创建消费者实例对象KafkaConsumerconsumer=newKafkaConsumer<>(configure());//订阅集合消费主题consumer.subscribe(Arrays.asList("test_kafka_topic"));//实时消费识别booleanflag=true;while(flag){//获取主题消息数据ConsumerRecordsrecords=consumer.poll(Duration.ofMillis(100));for(ConsumerRecordrecord:records)//循环打印消息记录System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());}//发生异常,关闭消费者对象consumer.close();}}通过上面的代码,我们可以很方便的获取到Topic中的数据。然而,当我们调用poll方法拉取数据时,KafkaBrokerServer做了那些事情。接下来,我们可以看看源码的实现细节。核心代码如下:org.apache.kafka.clients.consumer.KafkaConsumerprivateConsumerRecordspoll(finallongtimeoutMs,finalbooleanincludeMetadataInTimeout){acquireAndEnsureOpen();try{if(timeoutMs<0)thrownewIllegalArgumentException("Timeoutmustnotbenegative");if(this.subscriptions.hasNoSubscriptionOrUserAssignment()){thrownewIllegalStateException("Consumerisnotsubscribedtoanytopicsorassignedanypartitions");}//pollfornewdatauntilthetimeoutexpireslongelapsedTime=0L;do{client.maybeTriggerWakeup();finallongmetadataEnd;if(includeMetadataInTimeout){finallongmetadataStart=time.milliseconds();if(!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs,elapsedTime))){returnConsumerRecords.empty();}metadataEnd=time.milliseconds();elapsedTime+=metadataEnd-metadataStart;}else{while(!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)){log.warn("Stillwaitingformetadata");}metadataEnd=time.milliseconds();}finalMap>>records=pollForFetches(remainingTimeAtLeastZero(timeoutMs,elapsedTime));if(!records.isEmpty()){//在返回获取的记录之前,我们可以发送下一轮获取//并避免阻塞等待他们的响应以启用流水线,而用户//正在处理获取的/获取的记录//注意:由于消耗的位置已经更新,我们不得允许//在返回获取的记录之前触发唤醒或任何其他错误。如果(fetcher.sendFetches()>0||client.hasPendingRequests()){client.pollNoWakeup();}返回this.interceptors.onConsume(newConsumerRecords<>(finlongs));d=time.milliseconds();elapsedTime+=fetchEnd-metadataEnd;}while(elapsedTime>>pollForFetches(finallongtimeoutMs){finallongstartMs=time.milliseconds();longpollTimeout=Math.min(coordinator.timeToNextPoll(startMs),timeoutMs);//如果数据已经可用,立即返回//sendanynewfetches(won'tresendpendingfetches)fetcher.sendFetches();//Wedonotwanttobestuckblockinginpollifwearemissingsomepositions//sincetheoffsetlookupmaybebackingoffafterafailure//NOTE:theuseofcachedSubscriptionHashAllFetchPositionsmeansweMUSTcall//updateAssignmentMetadataIfNeededbeforethismethod.if(!cachedSubscriptionHashAllFetchPositions&&pollTimeout>retryBackoffMs){pollTimeout=retryBackoffMs;}client.poll(pollTimeout,startMs,()->{//因为提取可能由后台线程完成,所以我们需要这个轮询条件//以确保我们不会不必要地阻塞轮询()返回!fetcher.hasCompletedFetches();});//在长时间的轮询之后,我们应该检查组是否需要存储平衡//在返回数据之前,以便组可以稳定fasterif(coordinator.rejoinNeededOrPending()){returnCollections.emptyMap();}returnfetcher.fetchedRecords();}在上面代码中加粗的位置,我们可以看到每次消费者客户端拉取数据时,通过poll方法,首先在fetcher中调用fetchedRecords函数。如果取不到数据,会发起新的sendFetches请求。消费数据时,每批从KafkaBrokerServer拉取数据,有最大数据量限制。默认是500条记录,由属性(max.poll.records)控制,可以在客户端设置,来调整我们每次消费时拉取的数据量。提示:这里需要注意的是,max.poll.records返回的是一次轮询请求的数据总和,与分区数无关。因此每次消费从所有分区拉取的数据项总数不会超过max.poll.records设置的值。在Fetcher类中,在sendFetches方法中,对拉取的数据容量有限制,由属性(max.partition.fetch.bytes)决定,默认为1MB。可能会有这样的场景。当满足max.partition.fetch.bytes限制时,如果Fetch需要输出10000条记录,每次默认是500条,那么我们需要执行20次来fetch这次通过网络发起的所有请求。完全的。说到这里,可能有同学会有疑问。我们不能将默认的max.poll.records属性值调整为10000吗?可以调整,但是还有一个属性需要配合在一起。这个是每次轮询的超时时间(Duration.ofMillis(100)),这里需要根据你每条数据的实际容量来确定超时时间,如果你把最大值调整为10000,当每条数据的容量record很大,超时时间还是100ms,那么拉不到10000条数据是有可能的。而这里,还有一个需要注意的地方,就是session超时的问题。session.timeout.ms默认为10s,group.min.session.timeout.ms默认为6s,group.max.session.timeout.ms默认为30min。当你在处理消费的业务逻辑时,如果10s内没有处理完,消费者客户端就会和KafkaBrokerServer断开连接,消费的数据和产生的offset不能提交给Kafka,因为KafkaBrokerServer认为此时consumer程序已经断开连接,即使你设置了autocommit属性,或者设置了auto.offset.reset属性,你消费的时候还是会重复消费,这是因为session.timeout。ms超时引起的。2、在心跳机制的最后,说到由于session超时导致的消息重复消费,为什么会出现超时呢?有同学有这样的疑问。我的消费者线程明明已经启动了,但是还没有退出。为什么?无法消费Kafka的消息?消费组查不到我的ConsumerGroupID?这可能是超时导致的,而kafka是通过心跳机制来控制超时的,这对consumerclient来说是没有用的有趣的是,它是一个异步线程,当我们启动一个consumer实例时,heartbeat线程就开始工作了。org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程,定时发送心跳,检测消费者状态。每个消费者都有一个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息保存在org.apache.kafka.clients.consumer.internals中。Heartbeat中,声明的Schema如下所示:privatefinalintsessionTimeoutMs;privatefinalintheartbeatIntervalMs;privatefinalintmaxPollIntervalMs;privatefinallongretryBackoffMs;privatevolatilelonglastHeartbeatSend;privatelonglastHeartbeatReceive;privatelonglastSessionReset;privatelonglastPoll;privatebooleanheartbeatFailed;心跳线程中的run方法实现代码如下:publicvoidrun(){try{log.debug("Heartbeatthreadstarted");while(true){synchronized(AbstractCoordinator.this){if(closed)return;if(!enabled){AbstractCoordinator.this.wait();continue;}if(state!=MemberState.STABLE){//组不稳定(也许是因为我们离开了组或者因为协调器//kickedus),sodisableheartbeatsandwaitforthemainthreadtorejoin.disable();continue;}client.pollNoWakeup();longnow=time.milliseconds();if(coordinatorUnknown()){if(findCoordinatorFuture!=null||lookupCoordinator().failed())//theimmediatefuturecheckensuresthatwebackoffproperlyinthecasethatno//brokersareavailabletoconnectto.AbstractCoordinator.this.wait(retryBackoffMs);}elseif(heartbeat.sessionTimeoutExpired(now)){//thesessiontimeouthasexpiredwithoutseeingassuccessfulheouldbeat/proorydinhealth/proorbablyshator,coorbablyshatormarkCoordinatorUnknown();}elseif(heartbeat.pollTimeoutExpired(now)){//轮询超时已过期,这意味着前台线程已停止//在调用停止轮询()之间,因此我们明确地离开该组。可能是LeaveGroup();}elseif(!heartbeat.shouldHeartbeat(now)){//在等待重试后再次轮询,以防心跳协调器/分离器/控制器失败或失败.this.wait(retryBackoffMs);}else{heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(newRequestFutureListener(){@OverridepublicvoidonSuccess(Voidvalue){synchronized(AbstractCoordinator.this){heartbeat.receiveHeartbeat(时间.milliseconds());}}@OverridepublicvoidonFailure(RuntimeExceptione){synchronized(AbstractCoordinator.this){if(einstanceofRebalanceInProgressException){//itisvalidtocontinueheartbeatingwhilethegroupisrebalancing.This//ensuresthatthecoordinatorkeepsthememberinthegroupforaslong//asthedurationoftherebalancetimeout.Ifwestopsendingheartbeats,//however,thenthesessiontimeoutmayexpirebeforewecanrejoin.heartbeat.receiveHeartbeat(time.milliseconds());}else{heartbeat.failHeartbeat();//wakeupthethreadifit'ssleepingtorescheduletheheartbeatAbstractCoordinator.this.notify();}}}});}}}}catch(AuthenticationExceptione){log.error("Authenticationerroroccurredintheheartbeatthread",e);this.failed.set(e);}catch(GroupAuthorizationExceptione){log.error("Agroupauthorizationerroroccurredintheheartbeatthread",e);this.failed.set(e);}catch(InterruptedException|InterruptExceptione){Thread.interrupted();log.error("心跳意外中断读”,e);this.failed.set(newRuntimeException(e));}catch(Throwablee){log.error(“Heartbeatthreadfailedduetounexpectederror”,e);if(einstanceofRuntimeException)this.failed.set((RuntimeException)e);elsethis.failed.set(newRuntimeException(e));}finally{log.debug("Heartbeatthreadhasclosed");}}在心跳线程中,有两个最重要的超时函数,分别是sessionTimeoutExpired和pollTimeoutExpiredpublicbooleansessionTimeoutExpired(longnow){returnnow-Math.max(lastSessionReset,lastHeartbeatReceive)>sessionTimeoutMs;}publicbooleanpollTimeoutExpired(longnow){returnnow-lastPoll>maxPollIntervalMs;}2.1.如果sessionTimeoutExpired为sessionTimeout超时时间,会被当前coordinator标记为断开连接,此时Consumers将被移除,重新分配分区和消费者的对应关系,在KafkaBrokerServer中,ConsumerGroup定义了5种状态(如果算上Unknown,应该是6种状态),org.apache.kafka.common.ConsumerGroupState,如下图所示:2.2.如果pollTimeoutExpired触发轮询超时,此时消费者客户端会退出ConsumerGroup。再次轮询时,会重新加入ConsumerGroup,触发RebalanceGroup。KafkaConsumerClient不会帮我们重复轮询,我们需要在实现的消费逻辑中不断调用poll方法。3、分区与消费线程关于消费分区与消费线程的对应关系,理论上消费线程的个数应该小于等于分区的个数。之前有一个观点,一个consumer线程对应一个partition,当consumer线程等于partition的个数时,线程的利用率达到最大。直接使用KafkaConsumerClient实例确实没有问题。但是,如果我们有丰富的CPU,我们其实可以使用大于分区数的线程来提高消费能力。这就需要我们修改KafkaConsumerClient实例,实现消费策略的预计算,利用额外的CPU开启更多的线程,实现消费任务碎片化。Linux应该这样学