当前位置: 首页 > 科技观察

生产者的实现逻辑——kafka知识体系(二)

时间:2023-03-19 21:40:47 科技观察

kafka是单独发送消息还是批量发送消息?kafka如何单独发送消息?kafka是顺序发送消息吗?生产者在什么情况下可能会频繁FullGC?Messages发送上帝视角的逻辑看发送消息的过程。Producer的设计和消费发送机制:1)Serializer:将消息对象序列化为字节数组,然后通过网络进行传输。2)Partitioner:计算消息发送到的具体分区;如果显示指定分区,则不使用分区程序。3)消息缓冲池:客户端的消息缓冲池,默认大小为32M,见参数buffer.memory。4)批量发送:缓冲池中的消息会按batch批量发送,默认批量大小为16KB,见参数batch.size。负载均衡设计:由于消息topic由多个partition组成,partition会平均分配给不同的broker。因此,为了有效利用broker集群的性能,提高消息的吞吐量,producer可以通过随机或hash的方式将消息平均发送到多个partition,以实现负载均衡。分区策略:轮询策略,默认策略为随机策略。在实际性能上,不如轮询策略保持消息key顺序的策略。一旦消息定义了一个Key,那么就可以保证所有具有相同Key的消息都进入同一个分区,因为每个分区下的消息处理是顺序的。KafkaProducer源码//ClientID。创建KafkaProducer时,可以通过client.id定义clientId。如果不指定,则默认producer-seq,seq在这个过程中递增。强烈建议客户端显示指定的clientId。privatefinalStringclientId;//metrics的相关存储容器,如消息体大小、发送时间等与监控相关的指标。finalMetricsmetrics;//分区负载均衡算法,由参数partitioner.class指定。privatefinalPartitionerpartitioner;//调用send方法发送的最大请求大小,包括key和序列化消息体的总大小不能超过这个值。由参数max.request.size设置。privatefinalintmaxRequestSize;//生产者缓存占用内存的总大小,由参数buffer.memory设置。privatefinallongtotalMemorySize;//主题路由信息等元数据信息,由KafkaProducer自动更新。privatefinalMetadatametadata;//消息记录累加器privatefinalRecordAccumulatoraccumulator;//用于封装消息发送逻辑,即向broker发送消息的处理逻辑。privatefinalSendersender;//消息发送的后台线程,独立线程,内部使用Sender向broker发送消息。privatefinalThreadioThread;//压缩类型,默认不开启压缩,可以通过参数compression.type配置。可能的值:none、gzip、snappy、lz4、zstd。privatefinalCompressionTypecompressionType;//错误信息收集器作为监控的metric。privatefinalSensorerrors;//用于获取系统时间或线程休眠等privatefinalTimetime;//用于序列化消息的key。privatefinalExtendedSerializerkeySerializer;//SerializervalueSerializerprivatefinalExtendedSerializervalueSerializer;//生产者配置信息。privatefinalProducerConfigproducerConfig;//最大阻塞时间,当producer使用的buffer达到指定值时,此时将阻塞消息发送,最大等待时间可通过参数max.block.ms设置。privatefinallongmaxBlockTimeMs;//配置控制客户端等待请求响应的最长时间。如果在超时到期之前没有收到响应,客户端将在需要时重新发送请求,或者在重试次数耗尽时使请求失败。privatefinalintrequestTimeoutMs;//生产者端的拦截器在消息发送前进行一些自定义的处理。privatefinalProducerInterceptorsinterceptors;//维护api版本的相关元信息,该类只能在kafka内部使用。privatefinalApiVersionssapiVersions;//kafka消息事务管理器。privatefinalTransactionManagertransactionManager;//Kafka生产者事务上下文初始结果。privateTransactionalRequestResultinitTransactionsResult;KafkaProducer具有以下特点:KafkaProducer是线程安全的,可以被多线程使用。KafkaProducer内部包含一个缓冲池,用于存储要发送的消息,即ProducerRecord队列。同时会开启一个IO线程,将ProducerRecord对象发送给Kafka集群。KafkaProducer的消息发送API的send方法是异步的。它只负责将要发送的消息ProducerRecord发送到缓冲区,立即返回,并返回一个结果凭证Future。acks参数的作用KafkaProducer提供了一个核心参数acks来定义消息“提交”的条件(标准),也就是Broker端向客户端承诺已经提交的条件。可选值如下:0:只调用KafkaProducer的send方法返回后,认为全部成功或-1:表示消息不仅要求Leader节点已经存储了消息,而且还要求它的所有副本(准确的说是ISR中的节点)都被认为是提交了,然后返回给客户端提交成功。这是最严格的持久化保证,当然也是最低的性能。1:表示消息写入Leader节点后才能返回给客户端。retries参数的作用是kafka在生产端提供的另一个核心属性,用于控制消息发送失败后的重试次数。设置为0表示不重试,重试可能会导致消息在发送方重复。从消息发送接口:Futuresend(ProducerRecordrecord);Futuresend(ProducerRecordrecord,Callbackcallback);从上面的API我们可以知道,用户在使用KafkaProducer发送消息时,首先需要将要发送的消息封装成一个ProducerRecord,返回一个Future对象,典型的Future设计模式。Kafka消息追加流程KafkaProducer的send方法并不直接向broker发送消息。Kafka异步发送消息,即分解为两步。send方法的职责是将消息追加到内存中(在分区的缓存队列中),然后有专门的Send线程将缓存中的消息异步批量发送给KafkaBroker。主要方法是KafkaProducer#doSend将消息附加到生产者的发送缓冲区,其实现类为:RecordAccumulator。我们先看一下Kafka向内存写入消息的流程图:Sender线程到此结束,我们看到调用send方法时,实际上只是发送到生产者客户端的服务内存中。还没有联系到经纪人。Kafka生产者客户端后台会启动一个线程不断轮询消息batch存放的区域,将消息发送给Broker。消息batch的内存结构及分配根据上面的源码我们可以知道,每个ProducerBatch都是一块内存,大小为batch.size字节。并且使用了池化技术。缓冲池的内存持有类是BufferPool。我们先看看BufferPool的成员:publicclassBufferPool{//总内存大小privatefinallongtotalMemory;//每个内存块的大小,即batch.sizeprivatefinalintpoolableSize;//申请并返回内存方法privatefinalReentrantLocklock的同步锁;//释放内存块privatefinalDequefree;//需要等待空闲内存块的事件privatefinalDequewaiters;/**TotalavailablememoryisthesumofnonPooledAvailableMemoryandthenumberofbytebuffersinfree*poolableSize.*///缓冲池还没有分配新申请内存的空闲内存block就是从这里获取内存值privatelongnonPooledAvailableMemory;//...}从BufferPool的成员可以看出,缓冲池其实是由ByteBuffers组成的。BufferPool持有这些内存块,并存储在成员free中,free的总大小受totalMemory限制,nonPooledAvailableMemory表示缓冲池中还有多少内存没有分配。Batch消息发送时,会将自己持有的内存块返回free,这样后续的Batch在申请内存块时就不再创建新的ByteBuffer,直接从free中取,从而避免了内存块问题被JVM回收。创建内存块的过程如下:返回内存块的逻辑过程如果返回的内存块大小等于batchSize,则将其清空,加入到bufferpool的free中,即会返回缓冲池,防止JVMGC回收内存块。如果不相等,只需将内存大小与未分配和空闲内存大小值相加即可,无需归还内存。等待JVMGC回收,最后唤醒等待空闲内存的线程。Java生产者如何管理TCP连接为什么要使用TCP?ApacheKafka中的所有通信都基于TCP,而不是HTTP或其他协议。无论是生产者、消费者还是代理之间的通信都是如此。从社区的角度来看,在开发客户端时,可以利用TCP本身提供的一些高级功能,例如多路复用请求和同时轮询多个连接的能力。TCP的多路复用请求会在一个物理连接上创建多个虚拟连接,每个虚拟连接负责传输其对应的数据流。其实严格来说TCP是不能多路复用的,它只是提供可靠的消息传递语义保证,比如丢失消息的自动重传。而目前已知的HTTP库在很多编程语言中都有些粗糙。TCP连接是什么时候创建的?TCP连接在创建KafkaProducer实例时建立。当创建KafkaProducer实例时,生产者应用程序将在后台创建并启动一个名为Sender的线程。当Sender线程开始运行时,它首先会创建一个与BrokerConnection的链接。Propertiesproperties=newProperties();properties.put("bootstrap.servers","localhost:9092");properties.put("key.serializer",StringSerializer.class.getName());properties.put("value.serializer",StringSerializer.class.getName());//try-with-resources//创建KafkaProducer实例时,会在后台创建并启动Sender线程。Sender线程开始运行时,会先与Broker建立TCP连接,VALUE);Callbackcallback=(metadata,exception)->{};producer.send(record,callback);}bootstrap.servers是Producer的核心参数之一,指定连接时要连接的Broker的地址生产者开始。如果bootstrap.servers指定了1000个Brokers,那么Producer启动时会先与这1000个Brokers建立连接。因此,不建议将集群中的所有Broker信息都配置到bootstrap.servers中。通常,配置3到4个足够的Producer。一旦连接到集群中的任何Broker,就可以获得Broker信息(元数据请求)TCP连接也可能在两个地方创建:一个是在更新元数据之后,另一个是在发送消息时。Producer更新集群的元数据后,如果发现当前没有到某些Brokers的连接,Producer会创建一个TCP连接【场景1】当Producer试图向不存在的topic发送消息时,Broker会告诉Producer该主题不存在。如果存在,Producer会向Kafka集群发送元数据请求,尝试获取最新的元数据信息,并与集群中的所有Broker建立TCP连接。【场景二】Producer通过metadata.max.age.ms参数周期性更新元数据信息。默认值为300000,即5分钟。当Producer要发送消息时,Producer发现与目标Broker没有连接(取决于负载均衡算法),也会创建TCP连接。什么时候关闭TCP连接?Producer端关闭TCP连接有两种方式:用户主动关闭,Kafka自动关闭。【用户主动关闭】广义的主动关闭,包括用户调用kill-9杀死Producer,最推荐的方式:producer.close()【Kafka自动关闭】Producer端参数connections.max.idle.ms,默认值为540000,即如果9分钟内没有通过TCP连接请求,Kafka会主动关闭TCP连接。connections.max.idle.ms=-1会禁用这个机制,TCP连接会变成Kafka创建的永久长连接,Socket连接会开启keepalive。【注意】关闭TCP连接的发起者是Kafka客户端,属于被动关闭场景。被动关闭的后果是会产生大量的CLOSE_WAIT连接。Producer或Client没有机会明确观察到TCP连接已中断。总结现在我们可以回答前三个问题了。1、kafka是单独发送消息还是批量发送消息?通常,它们是分批发送的。封装成ProducerBatch发送。2、kafka如何发送单条消息?只能设置单生产者单线程同步调用send方法。3.Kafka是顺序发送消息吗?不行,如果需要顺序,必须设置key,producer是单线程的。4、生产者在什么情况下会频繁FullGC?如果你的消息大小大于batchSize,你不会循环从free中获取分配的内存块,而是重新创建一个新的ByteBuffer,并且这个ByteBuffer不会返回到缓冲池(被JVMGC回收)。如果此时nonPooledAvailableMemory小于消息体,free中的空闲内存块将被销毁(由JVMGC回收),以便缓冲池中有足够的内存空间提供给用户申请,这些动作会导致频繁的GC问题。所以需要根据业务消息的大小适当调整batch.size的大小,避免频繁的GC。