当前位置: 首页 > 科技观察

Kafka的生产者消费者机制+分区策略,你还不明白?

时间:2023-03-19 17:33:46 科技观察

本文转载自微信公众号《零后程序员小三》,作者003,转载本文请联系零后程序员小三公众号。什么是KafkaKafka最初是由Linkedin开发的。Linkedin于2010年向Apache基金会做出贡献,成为顶级开源项目。也是一个开源的【分布式流处理平台】,由Scala和Java编写,(也用作MQ系统,但不是纯消息系统)Kafka目前定位为分布式流处理平台,广泛用于其高吞吐量、持久性、水平可扩展性,并支持流数据处理。目前,越来越多的开源分布式处理系统如Cloudera、Storm、Spark、Flink等都支持生产者和消费者与Kafka的融合。在Kafka中,生产者(producer)向Broker发送消息,Broker将生产者发送的消息存储在磁盘中,消费者(Consumer)负责订阅并消费来自Broker的消息,消费者(Consumer))使用拉取模式从服务器拉取消息。Zookeeper负责整个集群的元数据管理和控制器选举。详情如下图所示。Kafka的生产者向Broker分区策略发送和订阅主题(Topic),生产者将消息发送到指定的主题,消费者消费负责订阅的主题。Kafka中的分区机制是什么?它将主题划分为多个分区,每个分区有多个副本。同一主题下不同分区的消息也不同。.生产者产生的每条消息只会发送到一个分区。Kafka中分区号是从0开始的,如果生产者向两个分区的topic发送消息,那么这条消息不在分区0,也就是在分区1。那么如何指定消息到指定分区呢?这个时候我们可以看看producer的发送逻辑。在此之前,我们需要知道一个叫做ProducerRecord的东西。这是什么?ProducerRecord是发送给BrokerValuepair的Key/value键,封装了基本的数据信息,简称PR。内部结构Topic(name)PartitionID(optional)Key(optional)ValueProducer发送逻辑1。如果指定了PartitionID,PR将被发送到指定的Partition。2.如果没有指定PartitionID,但是指定了Key,那么PR会根据hash(key)发送到对应的Partition。3.如果不指定PartitionID,不指定Key,PR会使用默认的round-Robin轮换训练发送给每个Partition(consumer消费partition默认是range模式)4.如果PartitionID和Key都存在指定了,PR只会发送到指定的Partition(此时的Key不起作用,代码逻辑Decision)注:Partition有多个副本,但是只有一个replicationLeader负责Partition和producer-消费者交互生产者到Broker的发送过程。Kafka的客户端向服务器发送数据(不是一个一个的)。它会经过内存缓冲区,通过KafkaProducer发送的消息会先进入客户端本地缓存,然后将消息收集到Batch中,然后一次性发送给Broker,这样性能可以改进了。producer常用配置#kafka地址,即broker地址bootstrap.servers#producer向leader发送数据时,可以通过request.required.acks参数设置数据可靠性等级,分别为0、1、全部。acks#请求失败,生产者会自动重试,指定时间为0,如果开启重试,会有重复消息的可能值会向服务器提交数据,默认值为16KBbatch.size#的默认值为0,消息立即发送,即使batch.size缓冲区空间未满,如果想减少请求次数,可以将linger.ms设置为大于#0,即发送的时间message一直保存在buffer中,如果超过了设定值,就会提交给server#通俗的解释就是,本该早就发送的message,被强制等待至少linger。一段时间内累积更多的消息,分批发送减量请求#如果batch满了或者linger.ms达到上限,如果batch满了或者linger.ms满足大小就发送其中一条,默认值为32MB。#如果buffer.memory设置的太小,可能会导致消息很快写入内存缓冲区,但是Sender线程来不及发送消息给Kafka服务器#内存缓冲区会很快被填满,并且一旦它满了,它就会阻塞用户线程,阻止你向Kafka写消息。#buffer.memory必须大于batch.size,否则会报申请内存不足的错误。不要超过物理内存。根据实际情况调整buffer.memory#key的serializer,序列化用户提供的key和value对象ProducerRecord,key.serializer必须设置,即使#message中没有指定key,也必须设置serializer一个实现org.apache.kafka.common.serialization.Serializer接口类,将#key序列化为字节数组。key.serializervalue.serializerKafka的consumer消费机制和partitionstrategy讲解consumer以什么方式从broker获取数据?为什么是pull模式而不是broker主动push?答案可见文章开头的图片。消费者使用Pullpull方式从broker的分区获取数据,那为什么是pull方式而不是push方式呢?拉动模式可以根据消费者的消费能力进行调整,不同的消费者有不同的表现。如果broker没有数据,consumer可以配置timeout的world,阻塞等待一段时间再返回。但是如果broker主动推送,推送的好处是可以快速处理消息,但是很容易让消费者无法处理,造成消息的堆积和延迟。消费者从哪个分区消费?我们知道一个topic有多个partition,一个consumergroup有多个消费者。那是怎么分配的?一个topic主题可以有多个consumer,因为有很多个partition分区(leaderpartition),一个leader分区可以被一个consumergroup中的一个consumer消费。那么消费者从哪个分区消费呢?策略1,round-robin(RoundRobinAssignor非默认策略)轮训,轮训是根据消费者组分配的,同一个消费者组听不同的topic。列出分区和所有消费者,因此消费者组中的订阅主题必须相同。如果主题不同,就会出现分配不均的情况。例如下面的例子:#有七个分区,同组有两个消费者topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6(partition)c-1:topic-p0/topic-p2/topic-p4/topic-p6(consumer1)c-2:topic-p1/topic-p3/topic-p5(consumer2)这样有什么缺点,如果在同一个consumergroup中,订阅的消息不同,在执行partitioning的时候不是round-robin分布,可能会导致partitions分布不均匀。比如现在有三个消费者C0、C1、C2,订阅了三个主题:t0、t1、t2。此时t0有1个分区(p0),t1有2个分区(p0,p1),t2有3个分区(p0,p1,p2)。消费者C0订阅了主题t0,消费者C1订阅了主题t0??和t1,消费者C2订阅了主题t0、t1和t2。因为是轮询机制,当C0订阅T0时,C1不能订阅T0,但是可以订阅T1,C2不能订阅T0,但是T1和T2都可以订阅。此时T2也只有C2订阅,其他C0和C1不可见。此时T2的消息就会被消费者C2消费。这种情况就是分配不均的问题。策略2.范围(RangeAssignor默认策略)范围根据主题分配。如果分布不均匀,第一个消费者会分配更多的partition,不影响一个消费者听不同的topic。这种策略的缺点是什么?对于一个topic,如果c-1多消耗一个partition,影响不会很大。如果有多个topic,那么对于每个topic,consumerC-1会多消费1个partition,topic越多,长时间消耗的partition越多,性能会下降。【面试题】Consumer消费者再分配策略和offset维护机制什么是rebalance操作Kafka如何将某个topic下的所有partition均匀分配给每个consumer,使消息消费速度达到最快,这就是balance。而再平衡(rebalancing)其实就是对分区进行重新分配,让分区的分布再次达到平衡状态。如下图,有A、B两个Consumer,当第三个成员C加入时,Kafka会触发Rebalance,重分配策略为A、B、C重新分区。Rebalance后的分布还是比较公平的。消费者实例获得了两个分区的消费权。当消费者在消费过程中突然宕机,恢复后会去哪里消费呢?消费者会记录偏移量,故障恢复后会从这里继续消费。那么offset记录在哪里呢?记录在zookeeper和本地,新版本默认在Kafka内置的topic中保证offset,名称为_consumer_offsets。本题目默认会有50个Partition,每个Partition有3份。分区数由参数offset.topic.num.partition配置。通过groupid的hash值和该参数的取模方法来判断一个consumergroup的consumeroffset保存到_consumer_offsetstopic的哪个分区。这是通过消费组名+主题+分区来确定唯一的offsetkey,从而得到对应的值。