当前位置: 首页 > 科技观察

vivo中Kafka负载均衡的实现

时间:2023-03-12 12:54:55 科技观察

作者|vivo互联网服务器团队-游说副本迁移是Kafka最频繁的操作。对于一个拥有几十万个副本的集群来说,手动完成副本迁移是一件非常困难的事情。CruiseControl是Kafka的运维工具。包括Kafka服务下线、集群负载均衡、副本扩缩容、副本缺失修复、节点降级等功能。显然,CruiseControl的出现让我们运维大规模的Kafka集群变得更加容易。备注:本文基于Kafka2.1.1。1.Kafka负载均衡1.1Producer负载均衡Kafka客户端可以使用partitioner根据消息的key计算partition。如果在发送消息时没有指定key,默认的partitioner会根据roundrobin算法为每条消息分配一个partition;otherwise根据murmur2hash算法计算出key的hash值,最后分区数取分区数取模。显然,这不是我们要讨论的Kafka负载均衡,因为生产者负载均衡看起来没有那么复杂。1.2消费者负载均衡考虑到消费者上下线、主题分区数量变化等情况,KafkaConsumer还负责与服务器交互进行分区重分配操作,以保证消费者可以消费主题分区更平衡的方式,从而提高消费性能;Kafka目前有两种主流的分区分配策略(默认为range,可通过partition.assignment.strategy参数指定):range:在保证均衡的前提下,为消费者分配连续的分区,对应的实现为RangeAssignor;round-robin:在保证平衡的前提下,循环分配,对应的实现是RoundRobinAssignor;0.11.0.0版本引入了一种新的分区分配策略StickyAssignor,它的优点是在保证分区平衡的同时,可以尽可能的保留原有分区。分区分配结果是唯一的,从而避免了很多冗余的分区分配操作,减少了分区重新分配的执行时间。不管是生产者还是消费者,Kafka客户端已经在内部为我们做了负载均衡,还需要讨论负载均衡吗?答案是肯定的,因为Kafka负载不平衡的主要问题存在于服务端而不是客户端。2、Kafka服务器为什么要做负载均衡?我们先来看一下Kafka集群的流量分布(图1)和新上线后集群的流量分布(图2):图1图2从图1可以看出资源组的流量分布网络中每个broker并不是很均衡,而且由于一些topic分区集中在少数几个broker上,当topic流量突然增加的时候,只有部分broker流量会突然增加。这种情况下,我们需要对topic分区进行扩容或者手动进行迁移操作。图2是我们Kafka集群扩容后某个资源组的流量分布。流量不能自动分配给新扩容的节点。此时,我们需要手动触发数据迁移,将流量引向新扩容的节点。2.1Kafka存储结构为什么会出现以上问题?这就需要从Kafka的存储机制说起。下图展示了Kafkatopic的存储结构,其具体层次结构描述如下:每个broker节点可以通过logDirs配置项指定多个日志目录。我们线上机器一共有12个磁盘,每个磁盘对应一个日志目录。每个日志目录下会有几个目录,带有[topic]-[x]字样。该目录用于存放指定主题和分区的数据。相应的,如果topic有3个副本,就会显示在集群中的其他broker节点上。有两个目录与??该目录同名。客户端写入Kafka的数据,最终会按照时间顺序成对生成.index、.timeindex、.snapshot、.log文件,这些文件存放在对应的topic分区目录下。为了实现高可用,我们的在线主题一般都是2份/3份。主题分区的每个副本分布在不同的代理节点上。有时为了降低机架故障的风险,主题分区是不同的。副本还需要分布在不同机架中的代理节点上。了解了Kafka的存储机制后,我们可以清楚的了解到,客户端写入Kafka的数据,会根据topic分区,路由到broker的不同日志目录。只要我们不手动干预,每次路由的结果都不会改变。改变。因为每次的路由结果都不会改变,问题就来了:随着topic数量的不断增加,每个topic的partition数量不一致,最终topic的partition会在Kafka集群中分布不均。比如:topic1有10个分区,topic2有15个分区,topic3有3个分区,我们的集群有6台机器。6个broker中总会有4个broker有2个topic1分区,3个broker有3个topic3分区,以此类推。这种问题会导致分区较多的broker上的出入流量比其他broker高。如果要考虑同一个topic不同partition流量不一致,不同topic流量不一致,再加上我们有7000个topic在线,130000个partition,270000个副本等等。在这样复杂的情况下,总会有集群中的broker负载特别高,也有的broker负载特别低。当broker负载足够高的时候,这个时候就需要我们运维同学介入了。我们需要帮助这些broker减轻压力,从而间接增加集群的整体负载能力。当集群整体负载较高,业务流量会不断增长时,我们将机器扩容到集群中。有的同学觉得扩机是好事。会有什么问题?问题同上,因为发送到topic分区的数据路由结果不会改变。如果没有人为干预,新扩容的机器流量永远为0,集群中原有的broker负载仍然收不到。减轻。3.Kafka如何做负载均衡3.1手动生成迁移计划和迁移如下图,我们模拟一个简单的场景,其中T0-P0-R0代表topic-partition-replica。第一个分区R0副本是领导者。我们可以看到有两个主题T0和T1。T0是5-partition2copy(进出流量分别是10和5),T1是3-partition2copy(进出流量是5和1)。如果严格考虑rack,topicreplicas的分布可能如下:假设我们现在新增一个broker3(Rack2),如下图所示:由于之前已经考虑了topic在rack上的分布,总体来说,broker2的负载较高。我们现在想把broker2上的部分分区迁移到新扩容的broker3上,考虑到机架、流量、副本数等因素,我们将T0-P2-R0、T0-P3-R1、T0-P4-R0这四个分区T1-P0-R1迁移到broker3。似乎还不是很平衡。再把T1-P2分区的leader调换一下:折腾了一番,整个集群平衡了很多。上面迁移副本和leader切换的命令如下:KafkaReplicaMigrationScript#CopyMigrationScript:kafka-reassign-partitions.sh#1.配置迁移文件$vitopic-reassignment.json{"version":1,"partitions":[{"topic":"T0","partition":2,"replicas":[broker3,broker1]},{"topic":"T0","partition":3,"replicas":[broker0,broker3]},{"topic":"T0","partition":4,"replicas":[broker3,broker1]},{"topic":"T1","partition":0,"replicas":[broker2,broker3]},{"topic":"T1","partition":2,"replicas":[broker2,broker0]}]}#2.执行迁移命令bin/kafka-reassign-partitions.sh--throttle73400320--zookeeperzkurl--execute--reassignment-json-filetopic-reassignment.json#3.查看迁移状态/清除限速配置bin/kafka-reassign-partitions.sh--zookeeperzkurl--verify--reassignment-json-filetopic-reassignment.json3.2使用负载均衡工具-cruise了解Kafka存储结构后手动干预topicpartiti在分布式上,我们可以看到Kafka运维非常繁琐。有没有什么工具可以帮助我们解决这些问题呢?答案是肯定的。Cruisecontrol是LinkedIn开发的解决Kafka集群运维难题的项目。Cruisecontrol可以动态负载均衡Kafka集群的各种资源。这些资源包括:CPU、磁盘使用率、入站流量、出站流量、副本分发等,并且cruisecontrol还有preferredleaderswitching和topicconfigurationchange等功能。3.2.1巡航控制架构下面简单介绍一下巡航控制的架构。如下图所示,主要由Monitor、Analyzer、Executor、AnomalyDetector四部分组成:(来源:巡航官网)(1)MonitorMonitor分为client-sideMetricsReporter和server-sideMetricsSampler:MetricsReporter实现了Kafka的指标报告接口MetricsReporter将原生的Kafka指标以特定格式报告给topic__CruiseControlMetrics。MetricsSampler从__CruiseControlMetrics获取原生指标,然后根据代理和分区级指标聚合它们。聚合指标包括代理和分区负载平均值和最大值等统计值。这些中间结果将发送到topic__KafkaCruiseControlModelTrainingSamples和__KafkaCruiseControlPartitionMetricSamples;(2)Analyzer作为巡航控制的核心部分,Analyzer根据用户提供的优化目标和Monitor生成的集群负载模型生成迁移计划。在巡航控制中,“用户提供的优化目标”包括两类:硬目标和软目标。hardgoal是Analyzer在做pre-migration时必须满足的一种目标(例如:copy在迁移后必须满足rack的去中心化原则),softgoal是尽可能达到的目标,如果acopy迁移后只能满足hardgoal和softgoal中的一个,hardgoal为主,如果有hardgoal不满足,分析失败。Analyzer可能需要改进:由于Monitor生成了整个集群的负载模型,我们的Kafka平台将Kafka集群划分为多个资源组,不同资源组的资源利用率差别很大,所以原生的集群负载模型不再适用到我们的应用场景。大部分业务生产时不指定key,所以各个partition的负载偏差不大。如果主题分区副本在资源组内均匀分布,则资源组也将变得平衡。原生巡航控制将从集群维度进行平衡工作。指定资源组后,可以从资源组维度进行均衡工作,但无法满足跨资源组迁移的场景。(3)Executor作为执行者,Executor执行Analyzer分析出的迁移计划。它将迁移计划以接口的形式批量提交给Kafka集群,然后Kafka根据提交的迁移脚本进行副本迁移。Executor可能需要改进:cruisecontrol在执行replicamigration等功能时无法触发集群preferredleader的切换:集群平衡过程中有时会出现shutdownrestart,问题机器partition的leader由于preferredleader不能自动切换回来,会导致集群中其他节点的压力急剧增加,这时候往往会发生连锁反应。(4)AnomalyDetectorAnomalyDetector是一个定时任务。定时检测Kafka集群是否失衡,是否存在缺失副本等异常。当Kafka集群中出现这些情况时,AnomalyDetector会自动触发集群内的负载均衡。在后面的主要功能描述中,我将主要介绍Monitor和Analyzer的处理逻辑。3.2.2Balancebroker/machineoffline和offlinebalance的入站和出站流量上面我们已经介绍了Kafka集群中broker之间流量负载不均的原因、图表和解决方案,那么cruisecontrol是如何解决这个问题的。其实cruisecontrol平衡集群的思路和我们手动平衡集群的思路大致相同,只是需要Kafka集群详细的指标数据,根据这些指标,计算出负载差距brokers之间,并根据我们关注的Resources做分析,得出最终的迁移方案。服务端收到均衡请求后,Monitor首先会根据缓存的集群指标数据构建一个能够描述整个集群负载分布的模型。下图简单描述了整个集群负载信息的生成过程。smaplefetcher线程会将获取到的native指标加载到一个更具可读性的MetricSample中,进一步处理获取brokerid、partition等指标的统计信息,这些指标存储在对应broker和replica的load属性中,所以broker和repilca都会包含流量负载、存储大小、当前副本是否是leader等信息。Analyzer会遍历我们指定的broker(默认是集群中的所有broker)。由于每个broker及其topic分区副本都有详细的指标信息,分析算法直接根据这些指标和指定的资源对broker进行排序。此示例中的资源是主题分区的领导者副本数。然后Analyzer会根据我们预先设置的最大/最小阈值和离散因子判断是否需要增加或减少当前broker上某个topic的leader副本数。如果增加了,就改变它。clustermodel将负载比较高的broker上相应的topicleader副本迁移到当前broker,反之亦然。在下面的改造要点中,我们将简要描述Analyzer的工作过程。在遍历了所有的broker,分析了我们指定的所有资源之后,我们得到了最终版本的集群模型,然后将其与我们最初生成的集群模型进行对比,生成topic迁移方案。Cruisecontrol会按照我们指定的迁移策略,将主题迁移计划提交到Kafka集群中批量执行。迁移方案示意图如下:3.2.3切换preferredleader切换非preferredleader的副本。迁移方案示意图如下:3.2.4主题配置变化改变主题副本数。当集群规模很大时,我们很难平衡整个集群。平衡一次往往需要半个月甚至更长的时间,无形中增加了我们运维同学的压力。对于这种情况,我们还修改了定速巡航。我们在逻辑上将Kafka集群划分为多个资源组,让业务有自己的资源组。当某个业务的流量出现波动时,不会影响其他业务。通过指定资源组,我们每次只需要对集群的一小部分或多个部分进行均衡,大大缩短了均衡的时间,使得均衡过程更加可控。修改后的cruisecontrol可以实现以下几点:通过平衡参数,我们可以只平衡一个或多个资源组的broker。更改主题配置时,例如添加主题副本,新扩展的副本需要与原主题副本在同一个资源组中。在资源组中,分析broker上的资源是迁入还是迁出。对于每一类资源目标,cruisecontrol是计算资源组内的统计指标,然后结合阈值和离散因素来分析broker是资源迁出还是迁入。如下图所示,我们在数据库中保存资源组下的集群、资源组和主题的元数据,Analyzer可以根据指定资源组范围内的资源分布目标对各个broker进行均衡分析。例如:在对broker-0做balance分析时,Analyzer会遍历goals列表,每个goal负责一类资源负载目标(cpu、入站流量等),当balance分析到goal-0时,goal-0会判断broker-0的负载是否超过上限。如果是,则需要将broker-0的部分topic副本迁移到负载较低的broker;否则,您需要将其他代理的副本迁移到broker-0。其中,以下复核目标是做余额分析时排名靠后的目标。在更新集群模型之前,会判断这次迁移是否会和之前的目标冲突。如果有冲突,集群模型将不会更新。当前的目标会不断尝试迁移到其他broker,直到找到合适的迁移目标,然后更新集群模型。3.3.2topic/topic分区迁移到指定broker考虑这些场景:一个项目下会有几个资源组。由于业务变动,业务希望将资源组A下的主题迁移到资源组B。业务希望将公共资源组的主题迁移到C资源组。平衡完成后,发现总有几个topic/partition分布的不是很均匀。面对这些场景,上述指定资源组进行均衡的功能已经不能满足我们的需求了。因此,我们针对上述场景修改的cruisecontrol可以做到:只平衡指定的topic或topicpartition;平衡主题或主题分区只迁移到指定的代理。3.3.3新目标分析——topicpartitionleadercopy去中心化业务方发送数据时大多不指定key,因此同一topic的各个partition的流量和存储接近,即每个topic的各个partition在leadercopies时集群的brokers尽可能均匀地分布在集群的broker上,集群上的负载将非常均匀。有同学会问了,topic分区的个数并不总是能被broker的个数整除,所以最后各个broker的负载还是不一致?答案是肯定的,仅仅通过分区的leader副本是无法达到最终平衡的。针对上述场景修改的cruisecontrol可以实现以下几点:新增一种资源分析类型:topicpartitionleadercopydispersion。首先保证每个topic的leader和follower副本尽可能均匀的分布在资源组的broker上。在2的基础上,replicas会尽可能的分发给负载较低的broker。如下图所示,对于每一个topic副本,Analyzer会依次计算当前broker的topicleader数量是否超过上限阈值。如果是这样,Analyzer将跟踪主题的Leader副本数、主题的Follower副本数以及broker的传出流量。Load等选择AR中的follower副本作为新的leader进行切换。如果AR副本中没有符合要求的经纪商,将选择AR列表之外的经纪商。3.3.4最终均衡效果下图为某资源组均衡后的流量分布。节点之间的流量偏差很小。在这种情况下,不仅可以增强集群抵御异常流量激增的能力,还可以提高集群的整体资源。使用和服务稳定,降低成本。3.4安装/部署cruisecontrol3.4.1客户端部署:指标采集【第一步】:创建Kafka账号,用于后续指标数据的生产和消费【第二步】:创建3个Kafka内部主题:a用于存放Kafka服务Nativejmx指标;b和c分别用于存放cruisecontrol处理的partition和modelindicators;【step3】:为step1创建的account授予read/write和cluster操作权限,用于read/writestep2创建的topic;【第四步】:修改Kafka的server.properties,增加如下配置:在Kafka服务上配置采集程序#修改Kafka的server.propertiestemetric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReportercruise.control.metrics.reporter.bootstrap.servers=域:9092cruise.control.metrics.reporter.security.protocol=SASL_PLAINTEXTcruise.control.metrics.reporter.sasl.mechanism=SCRAM-SHA-256cruise.control.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule需要username=\"ys\"password=\"ys\";【第五步】:将cruise-control-metrics-reporterjar包添加到kafkalib目录:mvcruise-control-metrics-reporter-2.0.104-SNAPSHOT.jarkafka_dir/lib/;【第六步】:重启Kafka服务。3.4.2服务器部署:索引聚合/平衡分析(1)从https://github.com/linkedin/cruise-control下载zip文件并解压;(2)上传你本地cruisecontrol子模块下生成的jar包替换cruisecontrol:mvcruise-control-2.0.xxx-SNAPSHOT.jarcruise-control/build/libs;(3)修改巡航控制配置文件,主要关注以下配置:#修改巡航控制配置文件security.protocol=SASL_PLAINTEXTsasl.mechanism=SCRAM-SHA-256sasl.jaas.config=org.apache.kafka.common。security.scram.ScramLoginModule需要用户名=\"ys\"密码=\"ys\";bootstrap.servers=域名:9092zookeeper。connect=zkURL(4)修改数据库连接配置:#Clusteridcluster_id=xxxdb_url=jdbc:mysql://hostxxxx:3306/databasexxxdb_user=xxxdb_pwd=xxx4.总结通过上面的介绍,我们可以看出Kafka存在明显的问题缺陷二:Kafka的每个partitionreplica都绑定到机器的磁盘上,partitionreplica由一系列的segment组成。因此,单分区存储往往会占用比较大的磁盘空间,对磁盘造成很大的压力。当集群中的代理扩展时,必须进行重新平衡。broker需要有一个良好的执行流程,保证各个broker的负载均衡,不出现故障。CruiseControl的诞生就是为了解决Kafka集群运维的难题。可以解决Kafka运维难的问题。参考文章:1.linkedIn/cruise-control2.KafkaCruiseControl简介3.ClouderaCruiseControlRESTAPIReference4.http://dockone.io/article/24346645。https://www.zhenchao.org/2019/06/22/kafka/kafka-log-manage/