当前位置: 首页 > 科技观察

好有深度的文章!图解KafkaProducer内存池架构设计

时间:2023-03-15 20:29:35 科技观察

在阅读本文之前,希望大家能够思考以下问题,带着问题阅读文章会得到更好的效果。发送消息时,当Broker挂了,消息体是否还能写入消息缓存?当消息还在缓存中时,如果Producer客户端挂了,消息会不会丢失?当最新的ProducerBatch仍有空闲内存,但下一条消息太大无法添加上一条时,Batch中会发生什么?那么创建ProducerBatch时应该分配多少内存呢?什么是消息累加器RecordAccumulatorkafka为了提高Producer客户端的发送吞吐量和性能,它会选择暂时缓存消息,等到满足一定条件后再批量发送,可以减少网络请求,提高吞吐量。RecordAccumulator类缓存此消息。上图是整个消息存储的缓存模型,接下来我们会一一讲解。消息缓存模型上图展示了消息缓存模型,产生的消息暂存在其中。对于每条消息,我们根据TopicPartition维度将它们放入不同的Deque队列中。TopicPartition是相同的,并且将在同一个Deque中。ProducerBatch:表示同一批消息。当消息真正发送到Broker端时,是分批发送的。该批次可能包含一条或多条消息。如果没有找到该消息对应的ProducerBatch队列,则创建一个队列。找到ProducerBatch队列尾部的Batch,发现Batch还能持有消息,就直接把消息塞进这个Batch里找到ProducerBatch队列尾部的Batch,发现Batch中剩余内存不足以容纳消息,然后它将创建一个新的批次。当消息发送成功后,Batch将被释放。ProducerBatch的内存大小那么在创建ProducerBatch的时候,应该分配多少内存呢?先说结论:当消息预估内存大于batch.size时,根据消息预估内存创建,否则根据batch.size的大小创建(默认16k).我们看一段代码,是在创建ProducerBatch的时候估算内存RecordAccumulator的大小#append/***公众号:石珍珍杂货店*微信:szzdzhp001**///求最大batch.size和batch中这条消息的总内存大小Valueintsize=Math.max(this.batchSize,AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic,compression,key,value,headers));//申请内存buffer=free.allocate(size,最大时间块);假设当前生产的消息为M,只是消息M找不到可以存储该消息的ProducerBatch(不存在或已满),此时需要创建一个新的ProducerBatch。与batch.size的默认大小相比,估计的消息大小为16384(16kb)。请求的内存大小的最大值。那么,这个消息的预测是如何预测的呢?仅仅是消息体的大小吗?DefaultRecordBatch#estimateBatchSizeUpperBound估计需要的Batchsize,这是一个估计值,因为它没有从额外的开销考虑压缩算法/***使用给定的key和value来获取只有一条记录的batchsize的上限。*这只是一个估计值,因为它没有考虑所使用的压缩算法的开销。**/staticintestimateBatchSizeUpperBound(ByteBufferkey,ByteBuffervalue,Header[]headers){returnRECORD_BATCH_OVERHEAD+DefaultRecord.recordSizeUpperBound(key,value,headers);}估计这条消息的大小M+一个RECORD_BATCH_OVERHEAD的大小是BATCH_OVERHEAD的一些基本元素信息,一共占用61B。消息M的大小不仅仅是消息体的大小,总的大小=(key,value,headers)大小+MAX_RECORD_OVERHEADMAX_RECORD_OVERHEAD:一个消息头占用的最大空间,最大为21B也就是说Create一个ProducerBatch,至少83B。比如我发送一条消息“1”,估计大小是86B,这是对比batch.size(默认16384)的最大值。然后申请内存的时候取最大值16384。关于Batch的结构和messages的结构,我们会在单独的一篇文章中进行讲解。内存分配我们都知道RecordAccumulator中的缓存大小是一开始定义的,由buffer.memory控制,默认是33554432(32M)。当生产速度大于发送速度时,可能会出现Producer写阻塞。而且ProducerBatch的频繁创建和释放会导致频繁的GC。所有kafkas中都有一个bufferpool的概念。这个bufferpool会被重用,但是只有固定的(batch.size)size可以使用bufferpool。PS:后面的16k指的是batch.size的默认值。批量创建和释放1.内存16K缓冲池①中有可用内存。当创建一个batch时,会去bufferpool中获取一块位于队首的内存ByteBuffer以供使用。②.消息发送完毕,Batch释放后,会将ByteBuffer放在缓冲池的尾部,调用ByteBuffer.clear清除数据。2.16K内存①的缓存池中没有可用内存。创建batch时,去非缓存池中的内存中获取一部分内存用于创建batch。注意:这里说的batch获取内存其实就是让nonPooledAvailableMemorynonPooledAvailableMemory减少16K内存,然后Batch才能正常创建。不要误以为记忆转移真的发生了。②.消息发送完毕,Batch释放后,会将ByteBuffer放在缓冲池的尾部,调用ByteBuffer.clear清除数据,以便下次重复使用。.创建batch时,去nonPooledAvailableMemory内存中获取一部分内存用于创建batch。注意:这里说的batch获取内存其实就是从nonPooledAvailableMemory中减少相应的内存,然后正常创建batch就可以了,不要误以为真的发生了内存的转移。②.消息发送后释放Batch,纯粹是将新释放的Batch内存大小添加到nonPooledAvailableMemory中。当然这个batch会被GC4次丢弃。内存不是16K,非bufferpool内存不够①。先尝试将cachepool中的内存逐个释放到non-bufferpool中,直到non-bufferpool中的内存足够创建batch②。创建Batch时,去nonPooledAvailableMemory内存中获取一部分内存用于创建Batch。注意:这里说的Batch的获取内存,其实就是减少nonPooledAvailableMemory对应的内存,然后Batch正常创建就可以了,不要误以为真的发生了内存的转移。③.消息发送后释放Batch,纯粹是将新释放的Batch内存大小添加到nonPooledAvailableMemory中。当然,这批会被GC丢弃。例如:接下来,我们需要创建一个48k的批处理。因为超过了16k,所以我们需要在non-bufferpool中分配内存,但是non-bufferpool中当前可用内存为0,无法分配。这时,它会尝试将缓存池中的部分内存释放给非缓存池。如果释放第一个ByteBuffer(16k)不够,则继续释放第二个,直到释放完3个字节后总共释放48k。当发现此时内存足够时,创建一个Batch。注意:我们这里所说的非缓存池中的内存分配只是指内存数量的增减。问答发送消息时,当Broker挂了,消息体还能写入消息缓存吗?当Broker挂掉后,Producer会提示如下警告??,但是在消息发送过程中仍然可以将消息体写入消息缓存,只是写入缓存。警告[ProducerclientId=console-producer]无法建立与节点0(/172.23.164.192:9090)的连接。Broker可能不可用当最新的ProducerBatch还有freememory,但是下一个消息很大,不够添加到上一个batch中怎么办?然后将创建一个新的ProducerBatch。那么创建ProducerBatch时应该分配多少内存呢?如果触发创建ProducerBatch的消息的估计大小大于batch.size,它将使用估计的内存创建。否则,使用batch.size创建。还有一个问题需要大家思考:当消息还在缓存中的时候,如果Producer客户端挂了,消息会不会丢失?