Broker处理topic创建1.更改本地topic配置缓存topicConfigTable2.将缓存的topicConfigTable配置信息写入磁盘3.将更改信息上报给NameServer4.主从同步更改信息源码入口broker端AdminBrokerProcessor#processRequestbroker定时任务ScheduleMessageService:每10S持久化每个延迟队列的发送进度ConsumerOffsetManager:Broker每5S持久化一次消费进度,将ConsumerOffsetManager#offsetTable属性序列化到consumerOffset.json文件中,以覆盖的形式重写,offsetTable是一个Map类型的属性,key是:topic@consumeGroup,value是每个ConsumeQueue的消费进度,也是一个集合,key是id,value是offsetFlushConsumeQueueService:每1S刷新一次ConsumeQueue,当有新数据写入时通过一个ConsumeQueue超过2页(8kb),强制Flush数据到磁盘;同时每60S对所有ConsumeQueue进行一次flush,不管IndexFile中新写入的数据Store有多少,同时唤醒那些已经订阅了新消息所属队列的消费者,让它们执行消息拉取工作CommitRealTimeService(开启写入缓冲池):将缓冲池中的数据提交到CommitLog的FileChannelWrite):每500ms刷新一次CommitLog。当新写入的数据超过16KB,或者距离上次Flush的时间间隔超过10S时,将内存中的CommitLog中的数据同步到磁盘文件中。CleanCommitLogService:每10S执行一次清除失败的CommitLog日志文件,默认在72h之前清理CleanConsumeQueueService:每10S执行一次清除失败的ConsumeQueue和IndexFile文件PullRequestHoldService:为每个ConsumeQueue保存消息PullRequest,每隔5S,根据条件:maxOffset>pullFromOffset来决定是否唤醒订阅对应ConsumeQueue的PullRequestClient。关闭那些超过2m还没有发送过心跳的连接。每30秒向指定的一个或多个Namesrc注册Broker信息。consumeQueue生成ReputMessageServiceInheritedServiceThread是一个线程服务。服务启动后,每隔1毫秒调用一次doReput方法。doReput方法会调用CommitLogDispatcher进行消息分发步骤:获取索引文件的mapFile,放入msgconsumerQueue中,这是帮助消费者查找消息的索引文件。里面存放的是每条消息的起始点,消息的大小和tag的hashCode。索引生成和结构索引文件可以根据key快速检索到相应的消息。key分为两种,一种是UniqKey,由系统自动生成(由createUniqID()函数生成,类似于uuid)。还有一个自定义键,放在消息的属性中,由生产者指定。通俗的说就是在一个文件中找一条消息。入口:ReputMessageService继承自ServiceThread,是一个线程服务。服务启动后,每隔1毫秒调用一次doReput方法。doReput方法将调用CommitLogDispatcher进行消息分发。步骤:获取索引文件的mapFile,放入msgMappedFileQueueCommitLog消息存储,ConsumeQueue等通常会记录大量数据。一个MappedFile有固定的大小(默认1G),所以一个MappedFile不能记录所有的内容,所以CommitLog和ConsumeQueue通常使用多个MappedFile记录数据,而RocketMQ使用MappedFileQueue来组织一系列的MappedFile。MappedFileQueue末尾的MappedFiles通常只是填充或有一些空间或只是分配MappedFiles。每执行一次写操作,就会从队尾获取最后一个。用于写入的MappedFile。如果在启动时配置了transientStorePoolEnable,DefaultMessageStore构造函数中会调用TransientStorePool.init方法,将ByteBuffer预分配并放入队列中,并锁定内存以防止操作系统替换它。临时存储池只有主Broker才会启用,刷盘方式为异步刷盘,transientStorePoolEnable为true。如果没有开启transientStorePoolEnable,创建的时候会生成一个文件,然后映射这个文件。如果transientStorePoolEnable,那么每次创建的时候,不仅会生成一个文件,然后映射这个文件,还会从TransientStorePool中“借”一块堆外内存(堆外内存已经申请了)as一个writeBuffer,然后每次写数据的时候,先写入writeBuffer。每次写入数据时,都会从MappedFileQueue中获取最后一个MappedFile。如果MappedFileQueue为空或最后一个MappedFile已满,将重新分配一个新的MappedFile。如果写入数据时,剩余空间不够写入(写入数据大小+结束标记>剩余容量),则将剩余空间写入文件结束标记,然后返回END_OF_FILE。然后重新申请一个mappedFile,重写。MappedFileflush操作根据具体配置分为同步和异步两种方式。不管是同步还是异步,操作都差不多,都是通过MappedFile.commit和MappedFile.flush。如果启用临时存储池TransientStorePool,则首先调用MappedFile。.commit将writeBuffer中的数据写入fileChannel,然后调用MappedFile.flush;MappedFile.flush通过fileChannel.force或mappedByteBuffer.force()执行实际的刷新操作。如果writeBuffer满了,会返回给TransientStorePool,writeBuffer会被置为null。mq存储对比https://www.zhihu.com/question/346540432mq交易服务器1:收到交易修改消息主题为RMQ_SYS_TRANS_HALF_TOPIC,并备份消息原主题,用于后续恢复消息主题commitmessage用于修改消息queueId为0,备份消息原来的queueId,用于后续commit消息恢复时恢复消息queueId的正常存储信息。因为主题改变了,无法消费2:从事务校验判断事务响应,然后执行操作判断事务状态是committed,rollback还是pending。根据提交的事务或回滚的事务构造OperationResult来提交或回滚事务。Committhetransactioncheck准备消息并返回到remotingCommand以结束消息事务。endMessageTransaction使用之前存储的真实主题和queueId重建一条新消息。刷机进程将新消息写入commitLog,删除成功。deletePrepareMessage,并把消息的偏移量作为消息opQueue回滚事务检查准备消息返回remotingCommand返回成功状态不需要做任何事情,因为真正的消息还没有被放置,所以不需要删除deletePrepareMessage,把偏移量将消息作为消息放入opQueue返回OperationResult结果3:返回Checked并开启定时任务定时返回检查。不断消费halfQueue中的消息。如果在opQueue中找到消息的偏移量,则表示已处理。如果没有找到,则检查重新插入的次数、事务超时时间、立即检查事务的时间等,将消息放回halfQueue,并向客户端发送checkback请求。客户端根据本地事务状态发送提交和回滚。https://blog.csdn.net/hosaos/article/details/90240260nameservernameServer管理broker。broker注册时,topic信息(包括队列信息)、address、brokerId、brokerName会通过registerBrokerAll上报给nameServer。当代理启动时,计划任务调用registerBrokerAll。在更新代理配置和创建主题时,也会调用registerBrokerAll。brokerName->broker地址topic->queueDatacluster->brokerNamebrokerAddr->brokerl心跳信息brokerAddr->filterhttps://www.jianshu.com/p/3d8d594d9161pagecache读写:加pagecache锁,从哈希表中查找page,如果有pagereferencecount加一,lock(我们只需要在读写PageCache相关数据结构时加一个pagecache锁,当我们取出page的时候,引用计数加一,保证page不会被回收,那么如果该页被读取或写入,则以该页为粒度进行并发控制)如果该页是最新的:读取它!然后引用减一。如果页面不是最新的,给页面加排它锁(可能会休眠)拿到锁后,检查页面是否是最新的(所以其他进程可能在休眠时已经读取了这个页面)调用mapping->a_ops->readpage读取页面如果发生错误:释放引用并返回。注意,如果出错,会直接释放pagelock,否则wait_on_page:这个读是由底层完成的。在读取期间,页面被锁定。当读操作完成后,page会被锁住,所以wait_on_page会唤醒nowpage是最新的,已经降到之前的情况了。如果没有pagecache锁,使用page_cache_alloc分配一个page,加一个pagecache锁,再次查找hash表。如果存在,则可以还原为以前的情况。注意,你为什么要在这里再次查找?因为pageallocation时会释放pagecache锁,此时其他线程可能已经分配了需要的page,所以我们需要进行双重检查将page加入pagecache,释放pagecache锁,将page加入lru.注意lru使用了separate然后,问题就还原为上面的情况(代码中使用了goto)写入:计算文件地址空间中要写入的page的index__grab_cache_page(mapping,index,&cached_pa??ge):取从PageCache中获取需要的页面,并添加Lock,如果该页面不存在,则分配一个并添加到PageCache中。注意,此时当前线程持有页面的引用,页面处于锁定状态。这个过程和读过程类似,只是锁还没有释放。mapping->a_ops->prepare_write(file,page,offset,offset+bytes)__copy_from_usermapping->a_ops->commit_write(file,page,offset,offset+bytes)释放page的引用计数冲突:read和write在找的时候对应的页面,需要加pagecache锁。写请求锁定页面后,该页面的读请求需要等到写请求锁被释放后才能读取。//https://zhuanlan.zhihu.com/p/42364591放入消息时加锁RocketMQ在向CommitLog写入消息时采用了一种锁机制,即同一时刻只能有一个线程写入CommitLog文件。磁盘一次耗时比较长,锁竞争激烈。将会有更多的线程等待阻塞和等待锁。如果使用自旋锁,会浪费大量CPU时间,所以“同步刷机推荐使用可重入锁”。.异步刷写就是每隔一定时间刷写一次磁盘,写入隐式内存然后返回。锁持有的时间很短,所以锁竞争并不激烈,不会有大量线程阻塞等待锁。有时,锁会等待并旋转并等待。短时间内不进行上下文切换,所以使用自旋锁比较合适。request_replyProducer在发送请求时创建一个RequestResponseFuture,以correlationId为key,将RequestResponseFuture作为value存入map中,同时请求包含RequestResponseFuture中的correlationId和client的clientId。收到返回包后,根据correlationId获取对应的RequestResponseFuture,设置响应包内容。Producer端也会启动定时任务扫描map,检测请求是否超时。使用send发送请求。同步请求:每个RequestResponseFuture都有一个关闭的countDownLatch,收到这条消息的回复后解锁。生产者发送消息时,会为每条消息生成一个唯一标识,同时也会带上生产者的clientId。消费者接收并消费消息时,从消息中取出消息标识correlationId和生产者标识clientId,放入响应消息中,判断响应消息是从哪个请求消息返回的,响应消息应该是发送给哪个生产者。同时在响应消息中设置消息的类型和响应消息的主题,然后消费者将消息发送给代理。服务器接受请求和接受发送没有区别。服务端有处理回复请求的逻辑,也可以根据配置存储回复消息。broker收到响应消息后,需要将消息返回给指定的producer。Broker如何知道要发回给哪个生产者?由于消息中包含了生产者的标识clientId,在ProducerManager中,维护了该标识与频道信息的对应关系。通过这种对应关系,可以将返回包发送给对应的生产者。retrymessageRetry消息是consumer消费失败,需要broker重新发送的消息。失败的原因有两种,一种是业务端代码处理失败;另一种是消息在消费者的缓冲队列中停留超时,消费者会将消息从队列中移除,然后返回给Broker重新发送消息。重新发送到%RETRY%+consumerGrouptopic,这个topic在每个broker上都有一个,读写队列数为1。当消息到达broker后,会放在SCHEDULE_TOPIC_XXXX下面,然后是定时任务会读取对应的consumerQueue,从commitlog中读取消息放入retrytopic的队列中。客户端已经订阅了重试主题,此时消息可以被拉取下来。重试主题是消费者主题。每个经纪人都会有一个。同一组使用的标签必须相同。否则,注册的会覆盖之前注册的,导致消息丢失。标签过滤在服务器端通过散列过滤,在客户端通过等值过滤。tag1的消息在服务器端会被过滤,tag2的消息的一部分会加载到consumer1,导致consumer1对其进行过滤。consumer2只能消费部分消息。Rebalance的时机触发条件rebalance的条件是:每20s刷新一次(准确的说是在上次刷新后等待20s,@seeRebalanceService#run@seeClientRemotingProcessor#notifyConsumerIdsChanged每次Client启动时@seeDefaultMQPushConsumerImpl#start。每次client上报心跳,服务端会判断是否有配置变化或者新增id,如果有,会通知组下所有客户端重新平衡,当客户端下线时,也会通知组下所有客户端执行rebalancediskindexrrqm/s:每秒merge的读操作数,即delta(rmerge)/swrqm/s:每秒merge的写操作数,即delta(wmerge)/sr/s:every每秒完成的读I/O设备数,即delta(rio)/sw/s:每秒完成的写I/O设备数,即delta(wio)/srsec/s:每秒读取的扇区数。即delta(rsect)/swsec/s:每秒写入的扇区数。即delta(wsect)/srKB/s:每秒读取的K字节数。是rsec/s的一半,因为每个扇区的大小是512字节wKB/s:每秒写入的K字节数。是wsec/s的一半avgrq-sz:每次设备I/O操作的平均数据大小(扇区)。即delta(rsect+wsect)/delta(rio+wio)avgqu-sz:平均I/O队列长度。即delta(aveq)/s/1000(因为aveq的单位是毫秒)duiwawait:每次设备I/O操作的平均等待时间(毫秒)。即delta(ruse+wuse)/delta(rio+wio)svctm:每次设备I/O操作的平均服务时间(毫秒)。即delta(use)/delta(rio+wio)!!!svctm越接近await,等待时间越少%util:一秒内有多少百分比的时间用于I/O操作,或者一秒内有多少时间I/O队列不为空。80%表示设备已经很忙,即delta(usr)/s/1000(因为使用单位是毫秒):向设备发出的请求的平均队列长度。qusize=(rs+ws)*await/1000rocketmq如果磁盘压力大,性能为:1:ioutil几乎达到100%2:r/s(readscompletedpersecondNumberofI/Odevices),增加说明:rocketmq一般情况下读取热点数据,基本不需要去磁盘读取。如果它读取历史消息,会导致io指向磁盘,r/s读取次数会增加。这时因为读写都使用了pagecache,所以写请求的处理会变慢,w/s写的次数会增加。
