当前位置: 首页 > 后端技术 > Java

Kafka成长记6:Producer如何将消息放入到内存缓冲区(上)

时间:2023-04-01 22:31:22 Java

Kafka成长笔记6:Producer如何将消息放入内存缓冲区?如下图所示:本节我们继续分析发送消息的内存缓冲区的原理——RecordAccumulator.append()。消息是如何放入内存缓冲区的?在doSend中,accumulator.append()是在拉取元数据、消息的初始序列化方法、消息的路由策略之后。如下代码所示:(去除冗余日志和异常处理,截取核心代码)privateFuturedoSend(ProducerRecordrecord,Callbackcallback){TopicPartitiontp=null;try{//拉取Metadata,消息初始序列化方法,消息路由策略longwaitedOnMetadataMs=waitOnMetadata(record.topic(),this.maxBlockTimeMs);longremainingWaitMs=Math.max(0,this.maxBlockTimeMs-waitedOnMetadataMs);byte[]serializedKey=keySerializer.serialize(record.topic(),record.key());byte[]serializedValue=valueSerializer.serialize(record.topic(),record.value());intserializedSize=Records.LOG_OVERHEAD+Record.recordSize(serializedKey,serializedValue);确保有效记录大小(序列化大小);tp=newTopicPartition(record.topic(),分区);长时间戳=record.timestamp()==null?time.milliseconds():记录时间戳();回调interceptCallback=this.拦截器==nul我?回调:newInterceptorCallback<>(回调,this.interceptors,tp);//将路由结果和初步序列化消息放入消息内存缓冲区如果(result.batchIsFull||result.newBatchCreated){this.sender.wakeup();}返回结果。未来;}catch(Exceptione){抛出e;}//省略其他各种异常捕获}accumulator.append()主要是将路由结果和初步序列化后的消息放入消息内存缓冲区在分析如何将消息放入内存缓冲区之前,需要回顾一下其内部的基本结构.之前分析组件的时候,我们初步分析了RecordAccumulator的大致结构,如下图所示:1)设置一些参数batchSize,totalSize,retryBackoffMs,lingerMs,compression等2)初始化一些数据结构,比如batchesisanewCopyOnWriteMap<>()3)初始化BufferPool和IncompleteRecordBatches回顾完RecordAccumulator组件,我们来看看如何将消息放入内存缓冲区的数据结构中。publicRecordAppendResultappend(TopicPartitiontp,longtimestamp,byte[]key,byte[]value,Callbackcallback,longmaxTimeToBlock)throwsInterruptedException{//我们跟踪追加线程的数量以确保我们不会错过//中止不完整批次()。appendsInProgress.incrementAndGet();try{//检查我们是否有正在进行的批处理Dequedq=getOrCreateDeque(tp);synchronized(dq){if(closed)thrownewIllegalStateException("生产者关闭后无法发送。");RecordAppendResultappendResult=tryAppend(timestamp,key,value,callback,dq);如果(appendResult!=null)返回appendResult;}//我们没有进行中s记录批尝试分配一个新的批intsize=Math.max(this.batchSize,Records.LOG_OVERHEAD+Record.recordSize(key,value));log.trace("为主题{}分区{}分配一个新的{}字节消息缓冲区",size,tp.topic(),tp.partition());ByteBufferbuffer=free.allocate(size,maxTimeToBlock);synchronized(dq){//需要检查生产者在抢到出队锁后是否再次关闭。if(closed)thrownewIllegalStateException("生产者关闭后无法发送。");RecordAppendResultappendResult=tryAppend(timestamp,key,value,callback,dq);if(appendResult!=null){//其他人找到了我们一批,返回我们等待的那个!希望这种情况不会经常发生......free.deallocate(buffer);返回追加结果;}MemoryRecords记录=MemoryRecords.emptyRecords(缓冲区,压缩,this.batchSize);RecordBatchbatch=newRecordBatch(tp,records,time.milliseconds());FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,callback,ti??me.milliseconds()));dq.addLast(批处理);不完整的。添加(批次);returnnewRecordAppendResult(future,dq.size()>1||batch.records.isFull(),true);}}最后{appendsInProgress.decrementAndGet();}}整个方法的上下文看起来比较有逻辑性,涉及到很多数据结构。我们一步一步来分析。第一次看的话,大致可以梳理出以下脉络:1)getOrCreateDeque方法应该创建一个双端队列,队列的每个元素不是单个消息Record,而是消息的集合记录批次。2)free.allocate应该是在内存缓冲区中分配内存3)tryAppend应该是将消息放入内存创建一个队列用于存储消息集合在将消息放入内存缓冲区之前,首先通过getOrCreateDeque创建一个存储A排队等待消息集合。代码如下:privatefinalConcurrentMap>batches;publicRecordAccumulator(intbatchSize,longtotalSize,CompressionTypecompression,longlingerMs,longretryBackoffMs,Metricsmetrics,Timetime){//省略...this.batches=newCopyOnWriteMap<>();//Omit...}/***获取给定主题分区的双端队列,必要时创建它。*/privateDequegetOrCreateDeque(TopicPartitiontp){Dequed=this.batches.get(tp);如果(d!=null)返回d;d=newArrayDeque<>();双端队列previous=this.batches.putIfAbsent(tp,d);if(previous==null)返回d;elsereturnprevious;}创建的内存结构可以看做是一个变量batches,也就是一个CopyOnWriteMap。这个数据结构之前我们的组件图已经初步分析过了。结合这段代码,不难理解它的来龙去脉:这个map主要是以topic分区信息为key,value为一个queue。核心数据结构是RecordBatch。由于是第一次向主题分区发送消息,因此值为空,队列需要初始化,否则说明数据已经发送到这个topic的分区,值不为空,直接返回之前的队列。由于我们是第一次向test-topic发送消息,所以可以得到下图中的数据结构:之后执行了一个锁逻辑。如前所述,tryAppend应该将消息放入内存中。但是由于队列刚刚创建,deque.peekLast();必须为空,所以这段锁定的代码将不会被执行。synchronized(dq){if(closed)thrownewIllegalStateException("生产者关闭后无法发送。");RecordAppendResultappendResult=tryAppend(timestamp,key,value,callback,dq);如果(appendResult!=null)返回appendResult;}privateRecordAppendResulttryAppend(longtimestamp,byte[]key,byte[]value,Callbackcallback,Dequedeque){RecordBatchlast=deque.peekLast();if(last!=null){FutureRecordMetadatafuture=last.tryAppend(timestamp,key,value,callback,ti??me.milliseconds());如果(未来==null)last.records.close();否则返回newRecordAppendResult(future,deque.size()>1||last.records.isFull(),false);}返回空值;但是这里你会发现代码一个明显的特点,使用同步锁和线程安全的内存结构CopyOnWriteMap,这些很明显是线程安全的控件。为什么?因为同一个Producer可以使用多个线程来发送消息,所以需要考虑很多线程安全的事情。为什么选择CopyOnWriteMap而不是ConcurrentHashMap?你可以考虑一下。(这里提醒一下,JDK成长笔记中提到,CopyOnWriteMap底层是copy-on-write,适用于读多写少的场景。)使用synchronized加锁代码块,分段加锁不暴力。添加同步。这也是一个使用的亮点。如果你写在最后,你会发现你会在中间件中看到concurrent包下的大量组件。您可能很少在工作中使用它们。这些组件的使用值得我们在研究中间件源码的时候学习。.你必须更多地思考为什么,而不仅仅是它是什么以及如何使用它。这个想法需要刻意培养,希望大家慢慢培养。好了,今天的内容就到这里。有同学反映每一节都太干了,真的很干!好像有时候会比较吃力,所以后面的章节会尽量避免几万字的大篇章,会控制在6000字左右。另外,除了成长故事,偶尔也会分享自己的故事和在行业中遇到的事情。希望大家能从我的经历中得到另一种成长和收获,比如我如何学习和提高技术?我该如何画画?我怎么做技术分享等等。本文由博客多发平台OpenWrite发布!