当前位置: 首页 > 后端技术 > Java

Kafka调优策略分析

时间:2023-04-01 20:50:50 Java

上一篇我们讲解了Kafka的partition分配策略,StickyAssignor分配策略,RoundRobinAssignor分配策略,RangeAssignor分配策略。具体可以参考Kafka分区分配策略详解。这篇文章,我们就来看看Kafka的调优策略。一般来说,调优离不开监控。Kafka本身并没有提供很好的图形化监控系统,但是有很多第三方的Kafka监控工具做的比较好:BurrowKafkaMonitorKafkaOffsetMonitor通常开发KafkaEagle开发者已经非常熟悉使用Kafka发送数据,但是在这个过程中使用它,很多开发者在使用Kafka的过程中,对参数配置并没有深入探索,损失的是没有充分利用kfka的优势,不能很好的满足业务场景。Producer配置和描述Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("buffer.memory",67108864);props.put("batch.size",131072);props.put("linger.ms",100);props.put("max.request.size",10485760);props.put("acks","1");props.put("retries",10);props.put("retry.backoff.ms",500);KafkaProducerproducer=newKafkaProducer(props);buffer.memorybuffer.memoryKafka客户端发送数据到服务器通常需要缓冲。当你通过KafkaProducer发送一条消息时,它首先进入客户端的本地内存缓冲区,然后将很多条消息收集起来,一条一条地分批发送给Broker。.所以这个“buffer.memory”的本质就是约束KafkaProducer可以使用的内存缓冲区的大小,它的默认值为32MB。明白了这个意思,试想一下,在生产项目中这个参数应该怎么设置呢?大家可以先想一想,如果内存缓冲区设置得太小,可能会导致什么问题?首先要明确的是,内存缓冲区中会缓存大量的消息,一个一个的形成batch,每个batch包含多条消息。然后KafkaProducer的Sender线程会将多个Batch打包成一个Request发送给Kafka服务器。如果内存设置太小,可能会出现问题。消息很快写入内存缓冲区,但是Sender线程来不及向Kafka服务器发送Request。这样会不会导致内存缓冲区被快速填满?一旦满了,它就会阻塞用户线程,阻止它继续向Kafka写消息。因此,参数“buffer.memory”要根据自己的实际情况进行测试。您需要计算您的用户线程每秒将向生产环境中的内存缓冲区写入多少消息。如果每秒有300条消息,则需要进行压力测试。假设内存缓冲区为32MB,每秒向内存缓冲区写入300条消息,内存缓冲区会不会经常被填满?经过这样的压力测试,就可以调试出合理的内存大小了。batch.sizebatch.size是批量数据的大小,默认值为16KB。一般情况下,您可以尝试将该参数调大一些。大家可以在自己的生产环境中使用发送消息的负载来测试一下。比如发送消息的频率是每秒300条,如果将“batch.size”调整为32KB或者64KB,是否可以提高发送消息的整体吞吐量。从理论上讲,增加batchsize可以让更多的数据缓存在里面,这样一个Request发送的数据量就会更多,吞吐量可能会提高。但是,它不可能无限大。如果太大,数据缓冲区会被Batch发送出去,所以发送消息的延迟会很高。例如,一条消息进入批处理,但在发送出去之前,批处理需要5秒才能填充到64KB。那么这个消息的延迟是5秒。因此需要根据生产环境发送消息的速率调整不同的batchsize,衡量最终输出的吞吐量和消息延迟,设置最合理的参数。如果一批linger.ms长时间填不满,此时需要引入另一个参数“linger.ms”。意思是Batch创建后,最多多久,不管Batch满不满,都要发送出去。比如一个batch.size是16kb,在某个非高峰期发送消息很慢。这就导致有可能Batch创建之后,会有消息陆续进来,但是已经来不及补16KB了。这个时候你就等吗?如果你现在把“linger.ms”设置为50ms,那么只要从创建batch开始过了50ms,即使没有装满16KB,也会发送出去。所以“linger.ms”决定一旦你的消息被写入一个批次,它将等待这段时间,然后它才会与批次一起发送出去。避免出现一批来不及填满,导致消息积压在内存中发不出去的情况。它应该与batch.size一起设置。比如假设一个batch是32KB,我们需要预估正常情况下需要多长时间才能组成一个batch,比如可能需要20ms才能组成一个batch。然后linger.ms可以设置为25ms,也就是说大部分batch会在20ms内填满,但是你的linger.ms可以保证即使遇到低峰,20ms也填不满一个batch,依然会满的。强制批处理在25毫秒后发送。如果你把linger.ms设置得太小,比如默认是0ms,或者你设置成5ms,可能会导致你的Batch设置成32KB,但往往32KB的数据不够用,5ms后,直接强行把Batch发出去,会导致你的Batch没用,永远填不上数据。max.request.size最大请求大小:max.request.size,这个参数决定了每次向Kafka服务器发送的最大请求数,同时也限制了你可以向Kafka服务器发送的最大消息数。您可以根据消息的大小灵活调整。比如发送的消息都是大消息,每条消息包含很多数据,一条消息可能有20KB。此时,你的batch.size是不是需要调大一点?比如设置一个512KB?那么你的buffer.memory应该更大吗?设置128MB?这样才能在大消息的场景下使用Batch对多条消息进行打包。这时候“max.request.size”可以适当调整,比如调整为5MB。retries和retries.backoff.ms“retries”和“retries.backoff.ms”决定了重试机制,即如果一个请求失败,可以重试几次,每次重试的间隔是多少毫秒。确认机制:acks此配置是指示生产请求何时被视为完成的确认值。特别是,有多少其他经纪人必须向他们的日志提交数据并向他们的领导者确认此信息。典型值包括:0:表示生产者从不等待来自代理的确认。此选项提供了最少的延迟但同时具有最大的风险(因为当服务器出现故障时,数据将会丢失)。1:表示确认leaderreplica已经收到数据。这个选择延迟很小,同时保证服务器确认接收成功。-1:生产者将得到所有同步副本都已收到数据的确认。同时,延迟是最大的。但是这种方法并不能完全消除丢失消息的风险,因为同步副本的数量可能是1个。如果要保证部分副本收到数据,应该在topic-level设置选项min.insync.replicas设置。min.insync.replicas该配置指定当生产者设置回复为“全部”(或“-1”)时,要写入成功的副本回复的最小数量。如果未满足此最小值,生产者将在强制提高持久性时引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)min.insync.replicas和acks。一个典型的情况是创建一个有3个副本的主题,将min.insync.replicas设置为2,并将acks设置为“all”。如果大多数副本未收到写入,这将确保生产者引发异常。消费者端配置和说明fetch.min.bytes:服务端每次fetch请求应该返回的最小字节数。如果没有返回足够的数据,请求将等待直到返回足够的数据。auto.commit.enable如果为true,consumer获取的消息的offset会自动同步到broker。当进程挂起时,提交的偏移量将被新的消费者使用。写在最后近年来,在AIOps领域高速发展的背景下,各行业对IT工具、平台能力、解决方案、AI场景和可用数据集的迫切需求呈爆发式增长。基于此,云智于2021年8月发布了AIOps社区,旨在竖起开源大旗,为各行业的客户、用户、研究人员和开发者打造一个活跃的用户和开发者社区,共同贡献和解决行业问题。问题,促进该领域的技术发展。社区先后开源了数据可视化与编排平台——FlyFish、运维管理平台OMP、云服务管理平台——Moore平台、Hours算法等产品。视觉编排平台-FlyFish:项目介绍:https://www.cloudwise.ai/flyF...Github地址:https://github.com/CloudWise-...Gitee地址:https://gitee.com/CloudWise/f...行业案例:https://www.bilibili.com/vide...部分大屏案例:可添加小助手(xiaoyuerwie)注:飞鱼。加入开发者交流群,与行业大咖1V1交流!您还可以通过小助手获取云智慧AIOps信息,了解飞鱼的最新进展!