当前位置: 首页 > 后端技术 > Java

图Kafka消费者分区分配策略

时间:2023-04-01 20:52:08 Java

1。分配策略的作用在分析生产者的时候,我们专门写了一篇文章来分析生产者的分区分配策略Kafka生产者的三种分区策略生成器的分配策略是为了给我们生成的消息选择一个合适的分区发送,所以今天我们来讲解一下消费者的分区分配策略。他要做的就是分配同一个消费组中不同消费者可以消费的partition数量;同一个消费组中,一个分区只会被一个消费者消费。2.分配策略的选择2.1分配策略配置每个consumergroupclient可以配置一个partition.assignment.strategy属性,可以配置自身支持的多种分配策略,例如:partition.assignment.strategy=org.apache.kafka.clients。consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor默认策略是org.apache.kafka.clients.consumer.RoundRobinAssignor2.2选择合适的策略,因为每个clientmember可以自己配置多个支持的分配策略,哪个分配GroupCoordinator(消费者组协调器)用来分配这些资源的策略是什么?ConsumerGroup下的所有成员使用相同的分配策略进行分配是肯定有必要的。所以GroupCoordinator面临着选择哪种分配策略。选择逻辑如下:在1的基础上选择所有Member支持的分配策略,优先选择每个partition.assignment.strategy配置最靠前的策略。请看下面两个例子:caseconsumer-0consumer-1consumer-2选择策略case-1roundrobin,rangrang,roundrobin,strickroundrobin,rangroundrobin每个consumer在1的基础上给自己投票consumer-0票给roundrobin,consumer-1票给自己响了,consumer-3投票给roundrobin。这样roundrobin有2票,所以选择roundrobin作为Allocation策略;Case-2支持的所有分配策略都是:rang不需要投票,rang直接被选中。如果有新成员加入该组,其带来的分配策略与现有组中所有成员相同(如果该组有成员)如果支持的协议不交叉,将抛出异常:INCONSISTENT_GROUP_PROTOCOL[2022-09-0814:34:12,508]INFO[ConsumerclientId=client2,groupId=consumer0]重新平衡失败。(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)org.apache.kafka.common.errors.InconsistentGroupProtocolException:组成员支持的协议与现有成员的协议不兼容,或者第一个组成员试图加入空协议类型或空协议0[9202协议列表。-0814:34:12,511]ERROR错误处理消息,终止合作nsumer进程:(kafka.tools.ConsoleConsumer$)当当前发起JoinGroup的成员完成JoinGroup并调用onCompleteJoin时,在GroupMetadata#selectProtocol中调用该协议选择的代码逻辑。详见:KafkaConsumerJoinGroupRequest流程分析3。分配策略计算和传播3.1分配策略计算时机既然我们知道分区分配策略的选择,那么什么时候触发这个策略的逻辑计算呢?如果你看过之前的文章:Kafka消费者JoinGroupRequest流程分析那么你肯定会对这个有一定的了解回调当前发起JoinGroup请求的Member(成员)最特别的一个特点就是GroupCoordinator(GroupCoordinator)会把所有成员(member)的元信息打包返回给LeaderMember,而FollowMember不会在返回的LeaderMember收到回调并拿到meta信息后,开始计算每个member所属的partition应该被分配。代码定位ConsumerCoordinator#performAssignment@OverrideprotectedMapperformAssignment(StringleaderId,StringassignmentStrategy,ListallSubscriptions){ConsumerPartitionAssignorassignor=lookupAssignor(assignmentStrategy);if(assignor==null)thrownewIllegalStateException("Coordinatorselectedinvalidassignmentprotocol:"+assignmentStrategy);//省略一些代码...//更新所有订阅的Topics的元信息//如果元信息有变化,updateGroupSubscription(allSubscribedTopics);//省略部分代码...Mapassignments=assignor.assign(metadata.fetch(),newGroupSubscription(subscriptions)).groupAssignment();如果(协议==RebalanceProtocol.COOPERATIVE){validateCoperativeAssignment(ownedPartitions,assignments);}//省略部分代码...}上面代码主要是根据分配策略获取分配策略实例,然后调用assign方法计算分配方法,但是最后调用的计算逻辑是每个AbstractPartitionAssignor实现类的assign方法。您还可以实施自定义分配策略。您只需要实现接口AbstractPartitionAssignor。3.2分配策略传播在3.1分配策略计算时序中,我们知道了分配策略的计算时序,那么计算完成后如何通知其他Member自己对应的分配状态呢?每个Member收到JoinGroup的回调后,都会发起一个SyncGroupRequest,其中LeaderMember会使用刚刚计算出的分配策略作为入参发起请求。请看下图。上面发起的请求只是通知组协调器(GroupCoordinator)的分配,最后还需要组协调器(GroupCoordinator)通知每个Member。那么这个通知过程就是所有成员同步完成后的回调;具体可以参考:KafkaConsumerSyncGroupRequest详解4.图中所有的分配策略都解释清楚了,那么Kafka目前支持哪些分配策略呢?下面一一分析4.1RangeAssignor范围分区分配策略partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor这也是默认的分配策略,以单个Topic为维度计算分配。它只负责将每个主题的分区按字母顺序分发给消费者消费组中的所有消费者(成员),主题的分区按分区号排序。先计算分配给每个分区的最小平均分区数,然后将剩下的一一划分。例如:Topic1有11个分区;如果有3个消费者订阅,那么每个11/3=3的平均值剩下2个,那么前两个可以分成4个partition,最后一个可以分成3个;[4,4,3]最终分配如下|消费者|成员:client1-ba0ebe99-cd09-42e9-87b9-11b6f828bfcaTopic1-0,Topic1-1,Topic1-2,Topic1-3Member:client2-cbfb4cf2-c878-41d2-852c-86d56dbb99c2Topic1-4,Topic1-5,Topic1-6,Topic1-7Member:client3-ad60e7a5-204f-4741-b66f-f89Top1,Topica2Topic1-9,Topic1-10的分配是先分配一个consumer,再分配下一个,和遍历不同。clientId-1首先分配给[0,1,2,3]个分区,然后是分区。[图片上传失败...(image-83ca39-16635538??88622)]图片中的会员为消费者。对于消费组来说,其内部对象是MemberRange的劣势。Range对于单个Topic来说是比较均衡的,但是如果有很多Topic的话,这种情况下,Members排在前面的load可能会比Members排在后面的负载多很多。看,这种情况下,3个Member都订阅了这4个Topic,但是这么多Member的分区没有分配到一个4.2RoundRobinAssignor轮询分区策略对所有Member和所有TopicPartitions进行排序。轮训遍历分布Member-3offlineRoundRobin的一些缺点如果成员订阅不同的主题,最终的结果可能不会完全平衡。如果图中的Memner-3订阅Topic-4的数量比另外两个多,那么他一共消费了6个partition,而另外两个只消费了2个partition。如果这里的Member-3将分区Topic2-0和Topic3-1分配给另外两个分区,那就是最平衡的情况。那么有什么策略可以解决这个问题呢?接下来,我们还有另一种分区策略——粘性分区4.3StickyAssignor粘性分区策略上面介绍的两种分区分配方式或多或少都会有一些分布偏差,每次重新分配都会全部重新分配。计算分配一次,每次分配的结果都会有很大的偏差。如果在计算的时候能够考虑到之前的分配,尽量减少分配的变化,就是一个最优解。之前讲生产者的时候也谈到了粘性分区:Kafka生产者的3种分区策略那么消费者的粘性分区策略是什么样的呢?目标:分区的分布尽可能均衡。每次重新分配的结果应与上次分配结果一致。当这两个目标发生冲突时,应优先考虑第一个目标。第一个目标是每个赋值算法尽可能完成的目标,第二个目标真正体现了StickyAssignor的特性。首先,StickyAssignor粘性分区分配时,是根据RoundRobinAssignor的分配逻辑计算的,但是弥补了RoundRobinAssignor可能造成不平衡的一些缺点。比如我们说的是RoundRobinAssignor的缺点的case,但是在StickyAssignor中,下图的分布情况优化了RoundRobinAssignor的缺点来体现粘性分区的重新分配,或者上面的case(右边的StickAssignor上图),如果Member-2离线,stickypartition的计算方式是将离线Member所属的分区分配给其他Member,在分区已经拥有的前提下尽量平衡其他成员保持不变。Member-2有3个分区,分为Member-1的2个分区和Member-3的1个分区。最终的分配图如下:4.4CooperativeStickyAssignor策略上面分析的StickyAssignor粘性分区策略主要是为了保证consumerclients在rebalancing之后能够保持原来的分配方案。但是,StickyAssignor仍然属于RebalanceProtocol.EAGER协议。再平衡时,每个client首先要放弃当前持有的资源。为了解决这个问题,出现了CooperativeStickyAssignor分配策略。可以理解为CooperativeStickyAssignor的赋值策略和StickyAssignor类似。但是它在这个基础上使用了RebalanceProtocol.COOPERATIVE协议。渐进式再平衡。后面会写一篇文章来解释这块内容,挖个坑0.04.5自定义赋值策略先来看看分区策略的类图。如果我们想自定义分配策略,只需要实现接口:}/***分区分配的计算逻辑*/GroupAssignmentassign(Clustermetadata,GroupSubscriptiongroupSubscription);/***当组成员从领导者那里收到分配时调用的回调*/defaultvoidonAssignment(Assignmentassignment,ConsumerGroupMetadatametadata){}/***指示要使用的再平衡协议*默认使用RebalanceProtocol.EAGER协议,另一个选项是RebalanceProtocol.COOPERATIVE*/defaultListsupportedProtocols(){returnCollections.singletonList(RebalanceProtocol.EAGER);}/***返回分配器的版本,它指示用户元数据编码*和分配算法是如何演变的。*/defaultshortversion(){return(short)0;}/***分配器的名称RangeAssignor,RoundRobinAssignor,StickyAssignor,CooperativeStickyAssignor等词*对应的名字是*range,roundrobin,sticky,cooperative-sticky*/Stringname();当然我们也可以根据自己的需要实现其他的抽象类,比如:AbstractStickyAssignorabstraction类是专门用于粘性分区的抽象类。5.rebalanceprotocol上面我们讲了分区策略,但是分区策略本质上分为两类:RebalanceProtocol.EAGERRebalanceProtocol.COOPERATIVE协同rebalance,kafak2.4的功能,这两个不同的是EAGERrebalance协议需要一个consumer在参与重新平衡事件之前始终撤销它拥有的所有分区。因此,它允许对分配进行彻底重新洗牌。COOPERATIVE协议允许消费者在参与再平衡事件之前保留他们当前拥有的分区。分配器不应立即重新分配任何拥有的分区,而是可以向消费者指示他们需要撤销分区,以便在下一次重新平衡事件中将它们重新分配给其他消费者。进入每次小规模再平衡的过程,直到最终收敛平衡。COOPERATIVE的有效提升来自于此前EAGER协议的再平衡触发的stop-the-world(STW)。我们上面提到的三种分配策略分别是RebalanceProtocol.EAGER协议RangeAssignor范围分区分配策略RoundRobinAssignor轮询分区策略StickyAssignor粘性分区策略和CooperativeStickyAssignor分配策略使用的是RebalanceProtocol.COOPERATIVE协议。关于rebalance协议的更多解释请看:KafkaRebalance的两种协议