当前位置: 首页 > 科技观察

Kafka如何修改分区Leader

时间:2023-03-13 19:04:05 科技观察

前几天一群小伙伴问我:kafka如何修改优先级副本?他们有一个需求,他们想要指定某个分区中的副本之一作为领导者。这里插入一张图来描述需求分析。对于这样的问题,在我们的生产环境中是比较常见的。经常需要修改topic中某个分区的leader。例如,topic1-0分区有3个副本[0,1,2]。根据“优先副本”规则,副本0必须是领导者。我们都知道分区中只有Leader副本会提供读写副本,其他副本作为备份。如果在某些情况下,“0”副本的性能资源不足,或者网络不好,或者IO压力比较大,那么肯定会对Topic的整体读写性能有很大的影响。这个时候,切换一个压力较小的副本作为Leader就很重要了;prioritycopy:分区中的AR(所有副本)信息,优先排在第一位的副本作为LeaderLeader机制:分区中只有一个leader承担读写,其他副本只作为备份.那么如何实现这样的需求呢?解决方案知道了原理之后,我们就可以想到相应的解决方案了。只需将其替换为您指定的副本即可;AR={0,1,2}==>AR={2,1,0}一般有两种方式可以达到这个目的,我们来分析一下第一种方式:在分区副本重新分配之前,我写过很多文章分区副本的重新分配。如果您想了解更多有关分区副本重新分配、数据迁移和副本扩展和收缩的信息,您可以阅读链接文章。这里我简单说一下;generalpartitioningReplicaredistribution主要有三个过程生成推荐迁移Json文件执行迁移Json文件验证迁移过程是否完成这里我们主要看第二步,看看迁移文件大体是什么样的{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[0,1,2]}]}这个迁移Json的意思是将topic1的分区“0”的副本分配给[0,1,2],也就是说topic1-0分区最终在{brokerId-0,brokerId-1,brokerId-2}中有3份;如果你看过我在source之前写的partitioncopyredistributionprinciple代码分析,那你肯定知道不管你之前的分配方式是什么样子的,最终的副本分配都是[0,1,2],前面的副本会被删除,少数的会被添加;那么我们如果要满足我们的需求,只要把这个Json文件中的"replicas":[0,1,2]改一下,比如改成"replicas":[2,1,0],修改Json后,执行,执行执行,正式开始重新分配过程!迁移完成后,你会发现Leader变成了“2”上面第一个位置的副本。很容易出错。需要先获取原始分区分配数据,然后手动修改Json文件。这里更容易出错,影响也会更大。当然这些都可以通过验证接口来限制。最重要的一点是,副本重新分配目前只能有一个任务!如果您当前有“副本重新分配”任务,则无法在此处执行。“ReplicaRedistribution”是一个比较“重”的操作,错误对集群的影响会比较大方案二:手动修改AR顺序首先,我们知道分区副本的分配数据存放在节点brokers/topics中/{topicName}在动物园管理员中;我们来看一个Topic1的节点数据的例子;{"version":2,"partitions":{"2":[3,2,1],"1":[2,1,3],"4":[2,3,1],"0":[1,3,2],"3":[1,2,3]},"adding_replicas":{},"removing_replicas":{}}数据解释:version:版本信息,现在有“1”和“2”两个版本removing_replicas:需要删除的副本数据,在分区副本重新分配的过程中,数据迁移时会删除多余的副本即将完成,如果删除成功,这里的数据将被清空。在replicaredistribution过程中,会添加新添加的replica,添加完成后会清空这里的数据;partitions:Topic的所有分区副本分发方式;以上表示一共有5个partition以及对应的replica位置;了解了这些之后,如果要修改优先级副本,可以直接修改zookeeper中的节点数据;例如我们把“1”分区的复制位置改为[2,1,3]后,需要重新运行优先复制选举操作,例如执行shbin/kafka-leader-election.sh--bootstrap-serverxxxx:9090通过kafka命令--topicTopic1--election-typePREFERRED--partition1--election-type:PREFERRED表示以优先副本的形式重选。经过这两步,我们修改优先副本的目的就达到了……?其实不是,因为这里只是修改了zookeeper节点的数据,bin/kafka-leader-election.sh中的重选操作是由Controller进行的;如果了解Controller的作用和源码就知道,每个Topic的partitioncopy信息都保存在Controller中,保存在JVM内存中,然后我们手动修改Zookeeper中的nodes,这样做不会触发控制器更新自己的内存。也就是说,即使我们执行kafka-leader-election.sh,它也不会有任何变化,因为优先级副本还没有被感知到被修改;解决这个问题也很简单,让Controller感知数据变化即可。最简单的方法就是让Controller重选,数据重载!总结在zookeeper控制器重选中手动修改“AR”顺序执行分区复制重选操作(优先复制策略)代码简单当然以上功能必须集成到LogiKM中;简单代码如下//这里转成HashMap类型,不要自定义类型,以防后面将kafka节点数据添加到data节点,导致数据丢失.get("partitions");JSONArraypartitions=(JSONArray)partitionJson.get(partition);//部分代码省略//交换序列优先复制Integerfirst=partitions.getInteger(0);partitions.set(0,targetBroker);partitions.set(index,first);zkUtils=ZookeeperUtils.getKafkaZkUtils(clusterDO.getZookeeper());Stringjson=JSON.toJSONString(partitionMap);zkUtils.updatePersistentPath(ZkPathUtil.getBrokerTopicRoot(topicName),json,null);//写入之后成功,异步触发优先副本选举newThread(()->{try{//1.让Controller先重新选举(否则上面的修改还没有生效)(TODO..待优化->FrequentController重选Cluster性能会受到影响)zkConfig.deletePath(ZkPathUtil.CONTROLLER_ROOT_NODE);//等待Controller选举Thread.sleep(1000);//2.然后发起副本重选preferredReplicaElectCommand.preferredReplicaElection(clu??sterId,topicName,partition,"");}catch(ConfigException|InterruptedExceptione){LOGGER.error("重选exception.e:{}",e);e.printStackTrace();}}).start();优缺点优点:达到目标要求,简单,易操作缺点:频繁的Controller改选会对生产环境造成一定的影响;优化改进方案二中需要重选Controller,频繁的选举肯定会对生产环境造成影响;控制器承担了很多责任,比如分区副本重分配、topic删除、Leader选举等等,都是由它来完成的!那么如何在不重选Controller的情况下满足我们的需求呢?我们的需求是当我们修改zookeeper没有节点数据的时候,Controller能够快速感知并更新自己的内存数据就够了;对于这个问题,我会在下一篇文章中介绍这个问题。看完这篇文章,我会问你几个相关的问题。想一想;如果我在修改zk中的“AR”信息时不只是改变顺序,如果有新的或删除的副本会怎样?如果我手动修改brokers/topics/{topicName}/partitions/{partitionsNo.}/state节点,是否可以直接更新leader信息?副本选举的整个过程是怎样的?大家可以想一想,我会在后面的文章中为大家解答问题!