当前位置: 首页 > 科技观察

10张图告诉你RocketMQ如何保存消息

时间:2023-03-12 04:05:57 科技观察

大家好,我是君哥,今天给大家分享RocketMQ如何保存消息。1.简介首先在RocketMQ集群中创建一个名为MyTestTopic的Topic,并如下图配置:这里对图中几个参数进行解释:writeQueueNums:客户端可以向多少个队列发送消息;readQueueNums:客户端消费消息时可以从多少个队列中拉取;perm:当前topic的读写权限,2只允许读,4只允许写,6允许读写,默认为6。RocketMQ主要有三个消息相关的文件:commitlog、consumequeue和index。下面是这些文件的默认路径:[root@xxxstore]/root/store[root@xxxstore]abortcheckpointcommitlogconfigconsumequeueindexlock上面的writeQueueNums参数控制了consumequeue中的文件数。作为测试,我向主题MyTestTopic发送了100条消息,这些消息保存在提交日志文件中。consumequeue文件如下:[root@xxxMyTestTopic]/root/store/consumequeue/MyTestTopic[root@xxxMyTestTopic]01234567可以看到,每一个consumequeue都保存在consumequeue目录下Topic一个目录,用于保存本Topic的consumequeue文件。consumequeue文件为每个主题创建一个基于偏移量的索引。索引文件保存了消息的基于key的HASH索引。2、commitlog文件commitlog是RocketMQ保存消息的文件。commitlog不按Topic划分,所有Topic消息写入同一个commitlog。RocketMQ为了追求高效写入,采用了磁盘顺序写入。commitlog文件大小默认为1G,可以通过参数mappedFileSizeCommitLog修改。下面是服务器磁盘上保存的commitlog文件(文件大小1G):[root@xxxcommitlog]/root/store/commitlog[root@xxxcommitlog]0000000000000000000000000000001073741824如果配置的mappedFileSizeCommitLog参数为1048576,也就是1M,那么theserverdisk保存的commitlog文件如下:[root@xxxcommitlog]/root/store/commitlog[root@xxxcommitlog]000000000000000000000000000000000104857600000000000002097152000000000000031457280000000000000419430400000000000005242880可以看到:commitlog文件的命名以保存在文件中的消息最小的偏移量来命名,后一个文件的名称是前一个文件名加上文件大小。例如上面的前两个文件,第一个文件中的最小消息偏移量为0,第二个文件中的最小消息偏移量为1048576。这样通过偏移量查找消息时,可以先使用二分查找找到消息所在的文件,然后用偏移量减去文件名就可以轻松找到消息在文件中的物理地址。下面创建文件的代码可以看到commitlog文件的名称:StringnextNextFilePath=this.storePath+File.separator+UtilAll.offset2FileName(createOffset+this.mappedFileSize);返回doCreateMappedFile(nextFilePath,nextNextFilePath);}publicstaticStringoffset2FileName(finallongoffset){finalNumberFormatnf=NumberFormat.getInstance();nf.setMinimumIntegerDigitsMaximum(20);nf.set(0);nf.setGroupingUsed(false);返回nf.format(offset);}为了让commitlog操作更高效,RocketMQ使用mmap将磁盘上的日志文件映射到用户态的内存地址,减少了日志文件从磁盘到用户态内存之间的数据拷贝。代码如下:if(messageStore.getMessageStoreConfig().isTransientStorePoolEnable()){mappedFile.init(req.getFilePath(),req.getFileSize(),messageStore.getTransientStorePool());}catch(RuntimeExceptione){log.warn("使用默认实现。");mappedFile=newMappedFile(req.getFilePath(),req.getFileSize(),messageStore.getTransientStorePool());}}else{mappedFile=newMappedFile(req.getFilePath(),req.getFileSize());}写入消息时,如果isTransientStorePoolEnable方法返回true,则消息数据先写入堆外内存,然后异步线程将堆外内存数据Flush到PageCache,如果返回false则直接写入PageCache。之后PageCache中的数据会根据flushing策略持久化到磁盘。如下图:对应代码如下:publicCompletableFutureasyncPutMessage(finalMessageExtBrokerInnermsg){putMessageLock.lock();尝试{MappedFilemappedFile=this.mappedFileQueue.getLastMappedFile();结果=mappedFile.appendMessage(msg,this.appendMessageCallback,putMessageContext);switch(result.getStatus()){casePUT_OK:break;caseEND_OF_FILE:unlockMappedFile=mappedFile;mappedFile=this.mappedFileQueue.getLastMappedFile(0);if(null==mappedFile){log.error("创建映射文件2错误,主题:"+msg.getTopic()+"clientAddr:"+msg.getBornHostString());返回CompletableFuture.completedFuture(新PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED,结果));}result=mappedFile.appendMessage(msg,this.appendMessageCallback,putMessageContext);休息;}}最后{putMessageLock.unlock();}CompletableFutureflushResultFuture=submitFlushRequest(结果,味精);}无论先写堆外内存还是直接写PageCache,文件数据都会映射到MappedByteBuffer,如下图所示:主要用于读取消息,堆外内存用于写入消息。这样在一定程度上实现了读写分离,降低了PageCache的写入压力。再看文件映射的代码,如下:privatevoidinit(finalStringfileName,finalintfileSize)throwsIOException{this.fileName=fileName;this.fileSize=fileSize;this.file=newFile(文件名);this.fileFromOffset=Long.parseLong(this.file.getName());尝试{this.fileChannel=newRandomAccessFile(this.file,"rw").getChannel();this.mappedByteBuffer=this.fileChannel.map(MapMode.READ_WRITE,0,fileSize);这里使用了Java中FileChannel的map方法来实现mmap。有一个细节需要注意:MappedFile创建后,文件会被预热。目的是提前将PageCache加载到内存中,防止发生缺页时读写数据再次加载,影响性能。代码如下:).getFlushDiskType(),this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());}最后附上写commitlog的UML类图:3.前面在consumequeue文件中提到,所有Topic消息都写到同一个commitlogfile,如果直接在commitlog文件中查找messages,只能从文件开头查找,肯定会很慢。所以RocketMQ引入了consumequeue,它是根据Topic来保存offset的。从consumequeue文件的保存目录也可以看出:[root@xxxMyTestTopic]/root/store/consumequeue/MyTestTopic[root@xxxMyTestTopic]01234567Consumequeue会为每个Topic创建一个目录,在每个Topic目录下为每个consumequeue创建一个目录,比如上面MyTestTopic主题下有8个consumequeue。每个consumequeue目录保存了这个队列的文件内容。以上面第七个目录为例:[root@xxx7]/root/store/consumequeue/MyTestTopic/7[root@xxx7]00000000000000000000consumequeue的文件结构如下:前8个字??节保存消息在commitlogOffset,中间4字节保存消息大小,最后8字节保存消息中tag的hashcode。为什么要在此处保存标签的哈希码?如果一个Consumer订阅了TopicA中的Tag1和Tag2这两个标签,那么这个Consumer的订阅关系如下图所示:可以看到,订阅关系对象封装了订阅的Topic、标签和hashcode集合标签。当Consumer发送拉取消息请求时,会将订阅关系传递给Broker(Broker将其解析为一个SubscriptionData对象)。Broker在使用consumequeue获取消息时,首先判断最后一个8字节的taghashcode是否在SubscriptionData的codeSet中,如果不在则跳过,如果存在则根据offset从commitlog中获取消息并将其返回给消费者。如下图:参考如下代码:subscriptionData.getCodeSet().contains(tagsCode.intValue());}和commitlog一样,consumequeue也会使用mmap映射来为MappedFile存储对象。4、索引文件为了支持根据消息的某个属性进行查询,RocketMQ引入了索引文件。索引文件结构如下图所示:主要由三部分组成:IndexHeader、HashSlog和Indexentry。和commitlog一样,Index文件也会使用mmap来映射MappedFile存储对象。(1)IndexHeaderIndexHead由以下6个属性组成,定义在IndexHeader类中:1.beginTimestamp:索引文件中消息的最小存储时间。2.endTimestamp:索引文件中的最大消息存储时间。3.beginPhyoffset:索引文件中包含的消息中最小的commitlog偏移量。4.endPhyoffset:索引文件中包含的消息中的最大提交日志偏移量。5.hashSlotcount:索引文件包含的哈希槽数。6.indexCount:索引文件中包含的索引条目数。(2)HashSlogHashSlot是JavaHashMap中的哈希槽,默认为500万。每个HashSlot用4个字节的int类型保存最后一个Index条目的位置。注意:为什么上面说的是最后一个Index条目呢?因为Index表项存储了key的hashcode,如果出现hash冲突,HashSlot使用链表的方式解决,相同Hash值的前一个表项位置会保存在Index表项中。如下图所示:依次写入三个key为key1、key2、key3的消息,这三个key具有相同的hashcode。写key1时,hash槽保存key1报文的索引项位置,写key2时,hash槽保存key2报文的索引项位置,同时key2的索引项中的prevIndexmessage保存key1消息的索引入口位置,writekey3时,hash槽保存key3消息的索引入口位置,key3消息索引入口的prevIndex保存key2消息的索引入口位置。(3)索引条目索引条目目录由4个属性组成:1.keyhashcode:查找消息的key的hashcode。2.phyOffset:消息在commitlog文件中的物理偏移量。3.timediff:消息存储时间和beginTimestamp的差值。key查找消息时,如果key相同,还要看timediff是否在interval范围内。如果不在时间范围内,则不予退还。参考如下代码:phyOffsetRead);}4.prevIndex:key索引发生哈希冲突后,保存上一个具有相同哈希码的入口位置。默认情况下有2000万个索引条目。(4)搜索过程整个搜索过程如下图所示:详细代码见IndexFile类的selectPhyOffset方法。5、文件构建看到这里,你可能会有一个疑问,consumequeue和index文件的内容是什么时候写入的呢?当MessageStore被初始化时,一个线程ReputMessageService将被启动。这个线程的逻辑是在无限循环中每1ms执行一次,从commitlog中获取消息写入到consumequeue和index文件中。参考如下代码:DispatchRequestdispatchRequest=DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(),false,false);intsize=dispatchRequest.getBufferSize()==-1?dispatchRequest.getMsgSize():dispatchRequest()BufferSize;if(dispatchRequest.isSuccess()){if(size>0){DefaultMessageStore.this.doDispatch(dispatchRequest);}}}publicvoiddoDispatch(DispatchRequestreq){for(CommitLogDispatcherdispatcher:this.dispatcherList){dispatcher.dispatch(req);}}下面是dispatcherList的定义:this.dispatcherList=newLinkedList<>();this.dispatcherList.addLast(newCommitLogDispatcherBuildConsumeQueue());this.dispatcherList.addLast(newCommitLogDispatcherBuildIndex());可以看到,即使Broker宕机了,只要commitlog存在,就可以重建consumequeue和index文件。6.小结本文主要介绍RocketMQ的消息存储原理。RocketMQ的存储很艺术,但也很难理解。希望本文能带你入门RocketMQ存储。