Slave节点分布判断消息堆积过大切换到Slave进行查询。maxOffsetPy为当前最大物理偏移量,maxPhyOffsetPulling为该消息拉取的最大物理偏移量。它们之间的差值可以代表消息的累积量。TOTAL_PHYSICAL_MEMORY_SIZE表示当前系统物理内存。accessMessageInMemoryMaxRatio的默认值是40。通过以上逻辑可以计算出当前消息累积是否大于物理内存的40%,如果大于则设置suggestPullingFromSlave为true。//https://www.jianshu.com/p/cd138e67dca0masterdowntime1情况:nameServer没有检测到broker下线,不管client是否rebalance,producer和consumer都不知道broker宕机2情况:nameServer检测到broker下线了,但是client没有rebalance,生产者和消费者不知道broker挂了。3情况:nameServer检测到broker下线,client发生rebalance,生产者和消费者感知到broker下线。#nameServer管理代理。broker注册时,topic信息(包括队列信息)、address、brokerId、brokerName会通过registerBrokerAll上报给nameServer。当代理启动时,计划任务调用registerBrokerAll。在更新代理配置和创建主题时,也会调用registerBrokerAll。nameServer启动时,会启动一个定时任务,定时检测broker的心跳。如果经纪人的心跳消失了,它将被删除。只有当所有具有相同brokerName的broker都消失了,相应的queue信息才会被移除。Sender:在1和2中,向broker上的队列发送消息时,会失败。如果是同步发送,会重试,重试可能会失败,最后会抛出异常。3点,producer端会发现queue的masterId的broker挂了,会抛出异常。Consumer:在1和2,在这个broker上拉取队列消息时,会失败。在推送模式下,稍后会被拉取。3时,consumer会发现queue的masterId的broker挂了。它会寻找具有相同brokerName的broker地址来拉取消息,即从中拉取消息。对于producer来说,如果master挂了,向broker上的队列发送消息会失败,并抛出异常给用户。用户可以指定一个队列选择器(或者自己实现)来排除向broker上的队列发送消息。对于consumer,master挂了,rebanlance之后,会去broker拉取消息。多线程写磁盘速度是否提高https://blog.csdn.net/u013043103/article/details/84326462(1)读写最好不要多线程,硬盘读取速度和写是有限的,单线程时间已经满载,多线程会增加线程间的切换,从而增加时间。如果想提高读写速度,就加大硬盘,做raid(2)。首先,硬盘的写入是串行的,CPU的计算是并行的。如果你更注重计算,那么多线程可以提高,或者所谓的并行至于计算;如果侧重于存储,除非数据量足够体现优势,否则线程间切换的损失当然会更加低效。(3)这是基于算法的。目前大部分算法速度都很快,瓶颈在磁盘的IO上。大部分算法我们都测试过了,基本上一半以上的时间花在了磁盘IO上。比如我处理一张图片,处理数据需要1分钟,写图片需要2分钟。然后你把你的算法优化的很好,10秒就搞定了。你的效率提高了多少,但是如果我多线程写的话,我的效率翻倍,就是写图片需要1分钟,所以这个效率显然比优化你的算法更实惠。这个东西还是得针对算法。(4)单线程顺序写入时磁盘IO最快。如果多线程写入,需要不断的重新寻址磁头,写入速度反而会变慢。基于msgid的幂等性是不可靠的。当出现网络波动、网络延迟等诸多问题时,客户端向服务端发送消息时,服务端正常写入commit-log,可以响应客户端(ACK)时间失败。导致两条内容相同的消息的MsgId和OffsetMsgId不同。最后还是反复消费。如果consumer消费失败,调用sendMessageBack方法将消息发送给broker。Change(因为新消息存储的地址变了),uniq_key属性保存了原消息msgId并启用临时内存transientStorePoolEnablemappedFile有两种形式,一种只是映射内存,写入和读取都是从映射内存读取。但是读写都是直接操作pagecache,在读取大量冷数据,pagecache吃紧的情况下,会导致读写交互,可能会导致写超时(默认200ms以上)。一种是临时内存+映射内存。临时内存是jvm向操作系统申请的直接内存,会直接加锁,防止内核将这块内存替换到硬盘上。在这种形式下,writing是直接写入暂存器,然后返回。如果读取,映射内存仍然被读取。直接内存将有一个服务提交到映射内存。写入直接内存但未提交的数据无法读取。commit策略会在每次put消息结束时唤醒刷机服务。如果未提交的数据超过leastPages(默认4k)或者距上次提交的时间超过commitDataThoroughInterval(默认200ms),就会提交一次。启用临时内存会优化读写冲突,提高性能,相当于批量写入pagecache,尤其是当pachecache读取冷数据吃紧的时候。但是消息的写入不频繁(200ms以内,消息量小于4k),消息会造成明显的额外延迟。在正常的写压力下,基本不会有额外的延迟。DefaultrouteDefaultrouteTBW102说明如果broker开启autoCreateTopicEnable(默认为true),broker会向namesrv注册默认路由topic:TBW102当producer从namesrv中获取不存在的topic路由时,会取TBW102的路由信息??,查看哪些broker支持自动创建topic,然后构造新的topic路由信息。这里有一个陷阱。如果使用默认路由,则主题只能在一个代理中创建。比如下面这种情况,broker-a和broker-b支持一开始自动创建路由,producer在本地更新本地路由表[topicnew->broker-a;topicnew->broker-b],然后发消息给broker-a,broker-a更新topicnew到namesrv,此时namesrv上只有[topicnew->broker-a]路由信息,如果一段时间producer如果没有消息发送给broker-b,然后从namesrv更新topicnew的路由信息??,新添加的topic只会路由到一个broker,没有高可用。所以当消息不是很频繁时,最好手动创建主题。起点的逻辑默认是:最新点所属的消费者组是新上线的,订阅的消息,最早的消息(offset=0)还在内存中。rocketmq的设计者认为你是一个刚上线的业务,从第一条消息开始就会强制开始消费。如果订阅的消息已经生成过期消息。那么消费就会从我们客户端启动的时间点开始。设置为起点:客户端计算逻辑会为0,然后服务端发现该点小于最小点,会修正该点,设置为最小点并设置时间戳:如果消费者组之前消费过某个topic,参数setConsumeFromWhere无效。broker只要找到消费位置,就是基于broker的RebalancePushImpl#computePullFromWhereWithExceptionConsumerManageProcessor#queryConsumerOffsetDefaultMessageStore#getMessage事务回调实现TransactionalMessageCheckService类,这个类包含一个线程,这个线程默认每分钟触发一次事务检查,在它的onWaitEnd方法中,其实就是调用了TransactionalMessageService的check方法。粗略的回顾逻辑:HalfTopic对应的队列存放prepare消息,OperationTopic对应的队列存放prepare消息对应的commit/rollback消息,消息体包含prepare消息对应的offset。服务器通过比较两个队列之间的差异,找出超时还没有提交的事务,并检查回来。https://www.jianshu.com/p/feda710d9716刷盘服务https://blog.csdn.net/yecong111/article/details/103858172省位逻辑每个队列都会有一个ProcessQueue来维护每次刷盘的消费情况拉取一批消息时,会将消息放入一个treeMap中,key为offset,value为msg。每消费一条消息,这条消息就会从treeMap中移除。然后将内存中的offsetTable的信息更新到treeMap的顶部。然后,当MQClientInstance启动时,会启动一个定时任务,定期向远程报告位置。根据队列获取brokerName,然后获取地址,然后发送请求。拉取消息时,如果本地位置大于0,则设置可提交位置,提交本地位置。如果经纪人是奴隶,它将忽略它。如果是master,就会更新位置到内存中。broker端内存维护着每个消费组的消费点,持久化定时任务(BrokerController#init启用的定时任务)。ps:本站这条消息没有消费,需要消费。延迟消息延迟队列的核心思想是:当Producer发送消息时,指定delayLevel;或者Consumer消费消息时,返回RECONSUME_LATER,或者主动发送MessageBack(...,intdelayLevel)时,会将消息回传给Broker,Broker将消息做一个封装,指定topic为SCHEDULE_TOPIC_XXXX,QueudId=delayLevel-1,如果不指定delayLevel,默认为ReConsumeTimes+3,将打包后的消息存储在CommitLog中,ReputMessageService为其生成PositionInfo,tagsCode存储延迟的投递时间,存储在"的ConsumeQueue中SCHEDULE_TOPIC_XXXX”。有16个delayLevel,所以大多数情况下SCHEDULE_TOPIC_XXXX中会有16个ConsumeQueues。Broker启动时,ScheduleMessageService会启动16个线程对应16个delayLevel读服务,有序读取ConsumeQueue中的PositionInfo。ScheduleMessageService会在[当前时间<=延迟投递时间]时从CommitLog中提取消息,去除封装,擦除delayLevel属性,重新存入CommitLog,并立即更新延迟投递偏移量dealOffset。ReputMessageService再次为当前消息生成PositionInfo,因为没有delayLevel,PositionInfo存放在Topic为%RETRY%+consumeGroup,queueId为0的ConsumeQueue中。每个consumer在启动时订阅自己consumergroup的retryqueue,并且当重试队列中有位置信息时,会拉取相应的消息重新消费。消息的第一次重试会被送回原消费者(执行sendMessageBack的消费者),后续多次重试都会被订阅QueueId=0的消费者消费。消费者发送回消息时,可以指定延迟级别,默认级别:1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h,也就是说delayLevel=3表示在a之后重新发送延迟10秒,最大重量16次的试用次数对应2小时后发货,每增加一次消费,发货时间增加到下一阶段。当延迟级别delayLevel<0时,放入死信队列。重置站点重置站点的执行顺序是按照admin到broker再到consumer的顺序触发的。管理员负责构造参数以通知代理。broker负责查询consumeQueue的具体位移。经纪人负责通知消费者重置位移。根据时间戳找到consumeQueue对应的位移,然后broker通知consumer持久化消费位移,最终会持久化到broker的消费位移中。重置位置的操作本质上是在消费者端执行的。消费者端负责持久化新的消费位移,然后定时任务通知broker更新消费位移。整个位移重置过程中,消费者会设置ProcessQueue的状态为Dropped,从而阻塞消息拉取任务ConsumeRequest的执行,从而阻塞消息拉取,然后在消费者端修改消费位移,并通知broker修改通过心跳对消费者进行消费置换。最后通过rebalance流程开始重新消费消息。主从数据不一致的原因是磁盘放置滞后于主从复制。这样一来,当master挂掉的时候,数据可能没有放到磁盘上,但是这些数据可以从slave上消费,而当master恢复后,这部分数据就丢失了,造成数据不一致。主从同步RocketMQ的主从同步机制如下:(1)slave启动并与master建立连接(2)slave每隔5秒向master拉取消息。如果是第一次拉取,先获取本地commitlog文件中的最大偏移量,在这个偏移量处从服务器拉取消息;(3)master返回数据给slave(4)slave将数据写入自己的commitLog,并更新offset;重复上述步骤从服务端定时调度syncAll方法,定时向主服务器同步消费组订阅信息、消费站点、延迟消息站点、主题配置。BrokerController#handleSlaveSynchronize
