今天给大家分享一个学习RocketMQ系统架构核心知识点的回顾与总结。合作伙伴的帮助。RocketMQ是阿里巴巴的分布式消息中间件。2012年开源,2017年成为Apache顶级项目。1集群架构RocketMQ的集群架构如下图所示:从上图可以看出,整个集群有四个角色:NameServer集群、Broker主从集群、Producer、Consumer。1.1NameServer集群部署NameServer集群,但是节点之间不会同步数据,因为每个节点都会保存完整的数据。因此,如果单个节点挂掉,不会影响集群。1.2BrokerBroker采用主从集群实现多副本存储和高可用。每个Broker节点必须与所有NameServer节点建立长连接,定义并注册Topic路由信息并发送心跳。与所有NameServer建立连接,这样Broker的使用不会因为单个NameServer宕机而受到影响。Broker主从模式下,Slave节点主动从Master节点拉取消息。1.3ProducerProducer与NameServer任意节点建立长连接,定时从NameServer拉取Topic路由信息。Producer是否使用集群取决于其所在的业务系统。1.4ConsumerConsumer与NameServer任意节点建立长连接,定时从NameServer拉取Topic路由信息。Consumer是否使用集群取决于其所在的业务系统。Producer和Consumer只和任意一个NameServer节点建立连接,因为Broker会向所有NameServer注册Topic信息,所以每个NameServer保存的数据其实是一致的。2MessageQueueProducer发送的Message会保存在Broker的MessageQueue中,如下图:借助MessageQueue,Topic可以在Broker中实现分布式存储,如上图,Broker集群中保存了4个MessageQueue,这些MessageQueue保存Topic1-Topic3这三个Topic的消息。MessageQueue类似于Kafka中的Partition。通过MessageQueue,Producer可以并发发送消息给Broker,Consumer也可以并发消费消息。默认情况下,Topic可以创建的MessageQueue个数是4个,Broker可以创建的MessageQueue个数是8个,RocketMQ选择两者中较小的个数,即4。但是,这两个值是可配置的。3ConsumerRocketMQ的消费模式如下:图中Topic1的消息写入了两个MessageQueue,两个队列分别存储在Broker1和Broker2上。RocketMQ通过ConsumerGroup实现消息广播。比如上图中有两个消费者组,每个消费者组有两个消费者。一个消费者可以消费多个MessageQueue,但是同一个MessageQueue只能被同一个消费者组的一个消费者消费。比如MessageQueue0只能被ConsumerGroup1中的Consumer1消费,不能被Consumer2消费。4Broker高可用集群Broker集群如下图所示:Broker通过主从集群实现消息高可用。与Kafka不同,RocketMQ没有Master节点选举功能,而是采用了多Master多Slave的集群架构。Producer在写入消息时,写入Master节点,Slave节点主动从Master节点拉取数据,保持与Master节点的数据一致。Consumer消费消息时,可以从Master节点和Slave节点拉取数据。是从Master拉取还是从Slave拉取,取决于Master节点的负载和Slave的同步情况。如果Master负载高,Master会通知Consumer去Slave拉取消息,如果Slave同步消息进度延迟,Master会通知Consumer去Master拉取数据。简而言之,由Master决定是从Master拉取还是从Slave拉取。如果Master节点出现故障,RocketMQ会使用基于raft协议的DLedger算法进行主从切换。Broker每30秒向NameServer发送一次心跳。如果NameServer在120s内没有收到心跳,则判断Brokerdown了。5消息存储RocketMQ的存储设计很有创意。主要存储三个文件:CommitLog、ConsumeQueue、Index。如下图所示:5.1CommitLogRocketMQ消息存储在CommitLog中,每个CommitLog文件大小为1G。有趣的是,文件名并不叫CommitLog,而是以消息的偏移量命名的。例如第一个文件的文件名为0000000000000000000,第二个文件的文件名为00000000001073741824,依此类推得到所有文件的文件名。有了上面的命名规则,给定一条消息的偏移量,就可以根据二分查找快速找到消息所在的文件,消息减去文件名就可以得到消息在文件中的偏移量抵消。RocketMQ在写入CommitLog时采用顺序写入,大大提高了写入性能。5.2如果ConsumeQueue直接从CommitLog中获取Topic中的一条消息,效率会很低,因为需要从文件中的第一条消息开始顺序查找。引入ConsumeQueue作为CommitLog的索引文件,将大大提高检索效率。一开始我不明白ConsumeQueue和MessageQueue的区别。在网上查了一些资料,发现每个ConsumeQueue对应一个上面介绍的MessageQueue,MessageQueue只是一个概念模型。ConsumeQueue中的元素内容如下:前8个字??节记录了消息在CommitLog中的偏移量。中间4个字节记录消息大小。最后8个字节记录消息中标签的哈希码。这个标签的作用非常重要。如果一个Consumer订阅了TopicA、Tag1和Tag2,那么这个Consumer的订阅关系如下:可以看到,这个订阅关系是一个hash类型的结构,key是Topic名称,value是一个SubscriptionData一个封装标签的类型的对象。拉取消息时,先从NameServer获取订阅关系,获取当前Consumer所有订阅标签的hashcode集合codeSet,然后从ConsumerQueue获取一条记录,判断标签hashcode的后8字节是否在codeSet决定是否发送消息SendtoConsumer。5.3索引文件RocketMQ支持根据消息的属性来查找消息。为了支持这个功能,RocketMQ引入了Index索引文件。Index文件由文件头IndexHead、500万个哈希槽和2000万个Index条目三部分组成。5.3.1IndexHead一共有6个元素。前两个元素表示在当前Index文件中放置第一条消息和最后一条消息的时间。第三个和第四个元素表示当前索引文件中的第一条消息。CommitLog文件中第一条消息和最后一条消息的物理偏移量,第五个元素表示当前Index文件中的哈希槽数,第六个元素表示当前Index文件中的索引条目数。搜索时,除了传入key外,还需要传入第一条消息和最后一条消息的放置时间。这是因为索引文件名是以时间戳命名的,传入放置时间可以更准确地定位索引文件。5.3.2Hashslot熟悉Java中HashMap的同学应该对Hashslot的概念不陌生,Hashslot其实就是Hash结构的底层数组。Index文件中的Hash槽有500万个数组元素,每个元素是一个4字节的int类型元素,存放的是当前槽下最新索引条目的序号。这里Hash槽解决Hash冲突的方法是链表法,如下图:5.3.3Indexentry在每个Indexentry中,key的hashcode占4个字节,phyoffset表示的物理偏移量CommitLog中的message占8字节section,timediff表示消息的放置时间与header中的beginTimestamp的差值占4字节,preindexno占4字节。preindexno保存当前Hash槽中上一个索引条目的序号,一般只有在key发生Hash冲突时才有值,否则值为0,表示当前元素为第一个元素哈希槽。timediff保存在Index条目中以防止密钥重复。查找key时,如果key相同,输入的时间范围不满足timediff,则查找preindexno.5.3.4本节小结通过以上分析,我们可以总结出一个通过key在Index文件中查找消息的过程,如下:计算key的hashcode;根据hashcode找到Hash槽中的位置s;计算Index文件Position40+(s-1)*4中的Hash槽;读取这个槽的值,即索引条目号n;计算索引条目在Index文件中的位置,公式:40+500万*4+(n-1)*20;读取这个entry,比较key的hashcode和indexentry中的hashcode是否相同,key传入的时间范围是否和Indexentry中的timediff匹配。如果不满足条件,则查找preindexno的条目,找到后,从CommitLog中取出消息。6刷盘策略RocketMQ采用灵活的刷盘策略。6.1异步刷盘当消息写入CommitLog时,并不会直接写入磁盘,而是先写入PageCache缓存,然后使用后台线程将消息异步刷盘。异步刷写策略是消息写入PageCache后立即返回成功,这样写入效率非常高。如果可以容忍消息丢失,异步刷新是最好的选择。6.2同步刷盘即使同步刷盘,RocketMQ也不会刷每条消息。线程将消息写入内存后,会请求刷盘线程刷盘,但是刷盘线程不会只写当前请求的消息,而不会刷盘,而是将要刷的消息一起刷。同步刷策略保证了消息的可靠性,但同时也降低了吞吐量,增加了时延。7小结本文用7张图总结了RocketMQ的核心知识,希望能给大家带来快速入门。
