当前位置: 首页 > 科技观察

Consumer实现逻辑-kafka知识体系(四)

时间:2023-03-19 17:25:46 科技观察

上一篇分享了kafkabroker的实现原理,数据存储结构和信息持久化相关的东西。消息存储后,如何被消费者消费?这篇文章就来说说Kafka消费者端的那些事儿。1)拉机制Kafka的生产端是推机制,即Push,消费者端是拉机制,即Pull。2)Pull的优缺点优点是消费者可以自己控制消息的阅读速度和数量;缺点是不知道服务器有没有数据,所以需要一直拉或者隔一定时间拉,可能要拉多次等待。3)消息传递语义:Kafka默认保证at-least-once传递,允许用户实现at-most-once语义,exactly-once的实现依赖于目的存储系统。4)分区分配策略RangeAssignor:根据分区范围分配,当前默认策略;RoundRobinAssignor:循环分配;StickyAssignor:在Kafka0.11版本引入,根据负载等更多指标,尽可能均匀。这些在之前的文章中也有提到。ConsumerGroupConsumerGroup是Kafka提供的一种可扩展、容错的消费机制。Kafka只使用了ConsumerGroup的机制,但同时实现了传统消息引擎系统的两种模型:消息队列模型和发布/订阅模型。理想情况下,Consumer实例的数量应该等于Group订阅主题的分区总数。【消费者和消费组】Kafka消费者是消费组的一部分。当多个消费者组成一个消费组来消费主题时,每个消费者会收到来自不同分区的消息。假设有一个T1主题,有4个分区;同时,我们有一个消费组G1,这个消费组只有一个消费者C1。然后consumerC1会收到来自这4个partition的消息,如下图:Kafka的一个很重要的特性就是一条消息只需要写一次,它可以支持任意数量的应用读取这条消息。换句话说,每个应用程序都可以读取全部消息。为了让每个应用程序都能读取全部消息,应用程序需要有不同的消费者组。对于上面的例子,如果我们新增一个消费组G2,这个消费组有两个消费者,就会是这样的:这里值得注意的是,一个topic可以被多个消费者组消费,但是每个消费的数据消费组之间互不干扰,即每个消费组都消费完整的数据。一个partition只能被同一个consumergroup中的一个consumer消费,不能split给多个consumer。也就是说,如果你在一个消费者组中的消费者数量多于主题中分区的数量,那么多余的消费者就是无效的消费者分区分配过程,那么我们现在来看一下分配过程。1.确定组协调器每当我们创建一个消费者组时,kafka会为我们分配一个broker作为该消费者组的协调器(coordinator)2.注册消费者并选择一个leader消费者当我们有了协调器后,Consumers就会开始向其注册协调员。第一个注册的消费者会成为这个消费组的leader,后面的就是follower3。当leader被选中后,他会实时从coordinator那里获取partition和consumer信息。并根据分区策略为每个消费者分配分区,并将分配结果告诉协调器。4、follower消费者会从coordinator获取自己的partition信息进行消费。对于所有follower消费者来说,他们只知道自己消费的分区,不知道其他消费者的存在。5、至此,消费者知道了自己消费的分区。分区过程结束。当分区重新平衡发生时,领导者将重复分配过程。具体流程图可以参考上一篇文章。关于位移[displacementoffset]每个消费者在消费消息的过程中,必须有一个字段来记录当前消费在partition中的什么位置。该字段为消费者偏移量(ConsumerOffset),即消费者的消费进度。指标。似乎Offset只是一个数值。其实对于ConsumerGroup来说,就是一组KV对,Key是一个partition,V对应Consumer消费的partition的最新位移。TopicPartition->long但是记住消费者的位移是下一条消息的位移,而不是最新消费消息的位移。提交offsets主要是代表consumer的消费进度,这样当consumer出现故障重启时,可以从kafka中读取之前提交的offset值,然后从相应的offset开始继续消费,从而避免了整个消费过程。【位移存储】实际上,consumer端应用在提交位移时,实际上是将位移提交给了Coordinator所在的Broker。同样,Consumer应用启动时,也会向Coordinator所在的Broker发送各种请求,然后由Coordinator负责进行消费者组注册、成员管理记录等元数据管理操作。老版本的ConsumerGroup在ZooKeeper中保存位移。在新版本的ConsumerGroup中,Kafka社区重新设计了ConsumerGroup的位移管理方式,采用了在Kafka内部topic中保存位移的方式,即__consumer_offsets,俗称位移主题。至于我为什么放弃Kafka来节省位移,请看我之前的文章《基础概念、架构和新版的升级Kafka知识体系1》。【位移topic的数据格式】key位移topic的key要保存三部分:GroupID,topicname,partitionnumber,value主要保存offset信息,当然还有时间戳等信息。请记住,您可以根据消费者开始消费的时间重置时间?【offset提交】1.自动提交最简单的提交方式就是让消费者自动提交offset。如果enable.auto.commit设置为true,那么每隔5s,消费者将自动提交从poll()方法接收到的最大偏移量。可能出现的问题:数据重复读取假设我们还是使用默认的5s提交间隔,rebalancing发生在最新一次提交之后的3s。重平衡后,消费者从上次提交的offset位置开始读取消息。此时offset落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改commitinterval来更频繁地提交offset,减少重复消息可能出现的时间窗口,但这种情况无法完全避免。2.手动提交2.1同步提交的问题从名字上看是同步操作,即方法会等到位移提交成功后才返回。如果提交过程中出现异常,该方法会抛出异常信息。commitSync()的问题在于Consumer程序会一直处于阻塞状态,直到远程Broker返回提交结果,这个状态不会结束。需要注意的是,同步提交会在提交失败后重试。非资源限制造成的阻塞可能是系统的瓶颈,会影响整个应用的TPS,影响吞吐量。2.2异步提交手动提交有缺点。应用程序将阻塞,直到代理响应提交请求,这将限制应用程序的吞吐量。我们可以通过降低提交频率来提高吞吐量,但是如果发生再平衡,则会增加重复消息的数量。这时候可以使用异步提交,直接发送提交请求,不需要等待broker的响应。之所以不重试,是因为当它收到服务器响应时,可能已经成功提交了一个更大的offset。假设我们发送请求提交偏移量2000,此时出现短时通信问题。服务器接收不到请求,自然不会响应。同时,我们处理了另一批消息,并成功提交了offset3000。如果commitAsync()在offset2000处重试commit,可能会在offset3000之后commit成功。如果此时发生rebalancing,就会出现重复的消息。async的问题commitAsync的问题在于它不会在出现问题时自动重试。因为是异步操作,如果提交失败后自动重试,重试时提交的位移值可能已经“过期”或者不是最新值。所以,异步提交的重试其实是没有意义的,所以commitAsync不会重试,所以只要在程序停止前最后一次提交成功即可。这里有一个解决方案,就是不管成功还是失败,我们都会记录offsets信息。如果最后一次提交成功,忽略它。如果上次提交不成功,我们可以在下次重启的时候手动指定offset来整合异步和同步,同时使用commitSync()和commitAsync()来提交。对于定时和分阶段的手动提交,我们调用commitAsync()来避免程序阻塞,在Consumer关闭之前,我们调用commitSync()方法进行同步阻塞位移提交,以保证在Consumer关闭数据之前能够保存正确的位移.关于再平衡Rebalance分区的所有权从一个消费者转移到另一个消费者。这种行为称为再平衡(Rebalance)。Rebalancing非常重要,为消费者群体带来高可用和可扩展性,消费者可以放心的添加或移除。以下是触发重平衡的三种行为:当一个消费者加入组时,它读取了一个原本被其他消费者读取的分区,从而触发了重平衡。当一个消费者离开组(关闭或崩溃)时,它原来读取的分区将被组中的其他消费者读取,从而触发重平衡。当topic发生变化时,比如增加了一个新的partition,就会发生partitionredistribution并触发rebalancing。partitionrebalancing的时候topic不可用,rebalance真的太慢了??!!!这里我将添加在生产环境中由于配置不正确而导致的不必要的分区重新平衡。正常的集群变更不再考虑范围内:1.防止消费者因未能及时发送心跳而被踢出消费组。这里可以设置session.timeout.ms超时时间和heartbeat.interval.ms心跳间隔时间。一般可以设置超时时间为心跳间隔的3倍。2、Consumer的消费时间过长。如果消费者在规定的时间内不能消费到poll的消息,就会认为消费者有问题,消费者会自主离群,所以我们可以将max.poll.interval.ms设置为比处理时间略长。3、从第二点我们也可以指出,如果集群中经常有partitionsbalanced,那么你可能需要观察consumer执行任务所花费的时间,尤其要注意GC所花费的时间。很多时候线上的问题也是配置不合理造成的。