数据存储,RocketMQ采用文件编程模型。为了提高文件的写入性能,通常会引入内存映射机制。数据首先写入页面缓存,然后页面缓存数据在适当的时候刷新到磁盘。编写时必须考虑性能和数据可靠性。刷盘策略一般有同步和异步刷盘策略,RocketMQ也是一样,默认使用异步刷盘。简单看一下RocketMQ刷入操作的代码块:if(writeBuffer!=null||this.fileChannel.position()!=0){this.fileChannel.force(false);}else{//注意4.8.1:同步磁盘放置this.mappedByteBuffer.force();}}catch(Throwablee){log.error("强制数据写入磁盘时出错。",e);可以看到刷机实际上是调用了MappedByteBuffer的force方法。同步刷盘同步刷盘是指broker收到消息发送者的消息后,先写入内存,然后将内容持久化到磁盘,再返回消息给客户端表示消息发送成功。刷盘需要分两行来分析:第一行是broker启动时会启动一个刷盘线程,调用路径为:BrokerController#start()->DefaultMessageStore#start()->CommitLog#start()->GroupCommitService#start()->MappedFileQueue#flush();第二行是broker收到消息后加载或更新MappedFile,然后存入MappedFileQueue。调用路径为:SendMessageProcessor#processRequest()->DefaultMessageStore#putMessage()->CommitLog#putMessage()->CommitLog#handleDiskFlush()->GroupCommitRequest#waitForFlush()。第一行的刷盘线程会在while循环中每隔10ms执行一次刷盘操作。刷盘成功后会唤醒等待第二行响应的线程。第二行组装MappedFileQueue(CopyOnWriteArrayList类型)后,会调用countDownLatch的await方法等待刷机线程的执行。//broker收到消息组装MappedFileQueue后,等待flush线程执行publicvoidhandleDiskFlush(AppendMessageResultresult,PutMessageResultputMessageResult,MessageExtmessageExt){//同步flushif(FlushDiskType.SYNC_FLUSH==this.defaultMessageStore.getMessageStoreConfig(Disk))){finalGroupCommitService服务=(GroupCommitService)this.flushCommitLogService;如果(messageExt.isWaitStoreMsgOK()){GroupCommitRequest请求=newGroupCommitRequest(result.getWroteOffset()+result.getWroteBytes());service.putRequest(请求);/等待刷新线程执行booleanflushOK=request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if(!flushOK){log.error("dogroupcommit,waitforflushfailed,topic:"+messageExt.getTopic()+"tags:"+messageExt.getTags()+"客户端地址:"+messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}}else{service.wakeup();}}//异步刷新//注意4.8.2:异步刷新else{if(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){flushCommitLogService.wakeup();}else{commitLogService.wakeup();}}}for(GroupCommitRequestreq:this.requestsRead){//下一个文件中可能有消息,所以最多//两次刷新booleanflushOK=false;for(inti=0;i<2&&!flushOK;i++){//当前flush指针大于此消息对应的物理偏移量表示刷入完成flushOK=CommitLog.this.mappedFileQueue.getFlushedWhere()>=req.getNextOffset();if(!flushOK){//刷新操作CommitLog.this.mappedFileQueue.flush(0);}}//唤醒等待刷新的Threadreq.wakeupCustomer(flushOK);}同步刷盘需要说明的是,每次刷盘的时候不是一个消息,而是一组消息。如果客户端成功返回,则说明消息已经持久化到磁盘,即消息非常可靠,但是是以牺牲写入性能为代价的。但是由于RocketMQ的消息是先写入PageCache的,所以消息丢失的可能性比较大。小,如果可以容忍一定概率的消息丢失但又可以提高性能,可以考虑使用异步刷新。异步刷新是指Broker将消息存入PageCache后立即返回成功,然后启动一个异步线程执行FileChannel的force方法定时将内存中的数据刷新到磁盘,默认间隔500ms。RocketMQ中异步刷机的实现类是FlushRealTimeService。看到默认的间隔是500ms,你猜是不是FlushRealTimeService用的是定时任务?其实并不是。这里介绍带超时的CountDownawait方法。这样做的好处是,如果没有新消息写入,它会休眠500ms,但是在收到新消息后,可以将其唤醒,从而及时刷新消息,而不是一定要等500ms。刷盘线程的等待在CommitRealTimeService#run方法中,刷盘线程的唤醒在CommitLog#handleDiskFlush的异步分支中。文件恢复在这里只是简单的提一下。文件恢复分为正常退出后的文件恢复和异常退出后的文件恢复。正常退出后恢复:基于ConsumerQueue,获取其中消费的最后一条消息的物理偏移量。如果offset大于CommitLog文件中的offset,则删除ConsumerQueue中的冗余数据;如果小于CommitLog文件中的offset,则重新发送多出来的物理offset对应的消息,保证两个文件一致。异常后恢复:broker会记录最后一次flushcommitlog、index、consumequeue等文件的时间戳,然后记录一个checkpoint时间戳(checkpoint文件也会被flush生成一个文件)。以checkpoint中的时间戳为基准,对比commitlog中的时间戳,进行相应的操作。RocketMQ在启动时会创建一个名为abort的文件,然后在正常退出时删除该文件。因此,判断RocketMQ进程是否异常退出,只需要检查abort文件是否存在即可。如果存在,则表示异常退出。文件恢复入口:DefaultMessageStore#recover,详见:从RocketMQ学习基于文件的编程模式(二)另外,这里涉及到的缓存页与MappedByteBuffer和零拷贝有关。可以参考之前的一篇文章:ZeroinJavaCopy相关文章:RocketMQ源码-MappedFile介绍,其中涉及到TransientStorePool暂存池、MappedFile预分配、编写和烧写参考文章:从RocketMQ学习基于文件的编程模式(2))RocketMQ源码分析之消息存储
