?“ Kafka操作和维护控制平台Logikm”???更强大的控制功能???更有效的问题定位功能?更方便的集群操作和维护功能?更专业的资源治理?[TOC]
阅读本文后,您可能会获得以下知识
我们以前曾谈论过生产者的生产者,该唱片batch和ProducerBatch有什么区别?
RecordBatch是将消息存储在ProduceerBatch中的对象。此外,还有其他相关属性,例如重试,回调和其他相关属性。
创建新的ProducerBatch时,您需要同时构建MemoryRecordSbuilder。该对象可以理解为消息构造函数,所有消息都存储在此中。
以上源代码可以知道:
创建批处理后,自然需要写一条消息
源代码位置:
注意:当在这里写下消息时,第一个消息是从第62位写下的,因为前面的61B已由Batchheader(初始化时)预订。
要了解消息的格式,让我们看看消息是如何首先写的
defaultres#writeto
从源代码中,您可以知道消息格式是:
记录属性说明:
Varint是可变且自动的,可以有效地节省空间
标题属性说明:
同样,我也不会详细介绍。
当生产商即将发送时,批次将首先关闭。
关闭输出流附录流并压缩过程中的数据,它也将关闭它,即存储消息主体的输出流,然后将其称为lz4,然后在这里进行的实现类是kafkkalz4blockoutputstream
MemoryRecordSbuilder#CLOSSForreCordAppends Kafkkkkkartoutputstream#flush
WriteBlock()的压缩操作何时执行压缩操作,因此您应该知道,此时,记录被压缩。这只是记录。
我们已将记录消息设置为“ RecordBatchheader数据”。您还记得当我们写消息时,我们是否开始从位置61写作?
这个61B的空间是什么?
MemoryRecordSbuilder#WritedEfaultBatchheader
数据真正编写的地方
DefaultrecordBatch#WriteHeader
您可以看到CRC的计算,该计算在末尾计算,然后填充缓冲区,但这并不意味着CRC32放置在最后一个。CRC_OFFSET的位置为17。
RecordBatchHeader属性说明:
原始:https://juejin.cn/post/7121939963981398023