前言大家好,我是三君,一个自我救赎路上的非典型程序员。“一张图”系列旨在通过“一张图”系统地分析一个板块的知识点:三君子向来不喜欢零散的知识点,通过一张图将分散的知识点串联起来可以让我们有更深入更系统的认识一个盘子。同时本系列尽量简洁,希望能让大家用20%的时间快速理解本节下80%的内容。本文为“一张图”系列的第一篇:一张图分析RocketMQ。为了描述方便,整个系列在画图的时候被分成了很多小模块,也是按照模块一步步进行讲解。一张图分析RocketMQ原图,不好画,记得关注公众号:三位先生,一张图分析RocketMQ会深入到源码层面,源码就不贴了文章。在阅读源码的时候,三老师写了很多笔记,可以降低阅读源码的难度。需要的同学可以到三君的仓库里Fork:rocketmqrelease-4.3.0本文是《一张图解析 RocketMQ》系列的第一篇文章,今天这篇文章的内容主要分为三个部分:整体架构:RocketMQ的完整架构将从大家熟悉的“生产者消费者模型”开始逐步介绍,大家只需要记住一张完整的架构图即可。元数据管理:我把RocketMQ集群的元数据整理成一张图,让大家直观的了解元数据有哪些,各有什么用。消息收发示例:通过Docker部署RocketMQ,用简单的例子连接RocketMQ的消息收发流程。总体架构什么是消息队列?顾名思义,首先要有一个队列,用来存储消息。然后就是消息队列,要有人放进去,还要有人拿进去,是不是有种似曾相识的感觉?这就是连小学生都知道的经典“生产者消费者模型”吗?接下来我们就来看看它里面穿的是什么?别着急,我们先来重温一下“生产者消费者模型”的老朋友。简单来说,这个模型由两种线程和一个队列组成:生产者线程:生产产品,放入队列。消费者线程:从队列中获取产品并消费。有了这个队列,生产者只需要关注生产,不需要关心消费者的消费行为,更不需要等待消费者线程执行完毕;消费者只需要消费,不管生产者如何生产,更不用说等待生产者生产了。这意味着生产者和消费者是解耦和异步的。这很棒,因为我们的很多生活都是异步的。比如最近新冠疫情卷土重来。我点的外卖只能送到小区门口的外卖排队处,我也只能到外卖排队处取外卖,然后狼吞虎咽。如何实现“生产者消费者模型”?我想每个上小学的人都学过。让我们看看这个模型存在哪些问题。最大的问题是我们小学时学的“生产者消费者模型”是单机版的,只能自己打气了。这就相当于,我是一个外卖骑手,我点了一份外卖放到外卖队列,然后我从外卖队列中取走,操作猛如虎!于是就有了进化版。我们把消费者、队列和生产者放在不同的服务器上。这就是传说中的分布式消息队列。生产者生产的消息通过网络传递到队列存储,消费者通过网络从队列中获取消息。但是还有一个问题。新闻可能有很多种。把它们都放在一起不是很乱吗?我点的外卖和快递都是放在一起的,太难找了。所以我们需要区分不同类型的消息,相同类型的消息称为一个Topic。同时骑手不可能只有一个,点餐也不会只有我一个,所以有生产者群体,也有消费者群体。但是还有一个问题。社区太大了,一个队列放不下。我住在小区南门,点了外卖还要到北门去取,真是蛋疼。因此,物业在东南门和西北门分别设置了外卖快递点。即我们有多个队列组成一个队列集群。然而,问题又来了(没完没了),一个小区派送的队列那么多,骑手怎么知道去哪里派送,我又怎么知道去哪里取货?很简单,导航。我们称导航信息为路由信息。这些信息需要管理。它告诉生产者一个主题消息可以发送到哪些队列,同时告诉消费者可以从哪些队列中取出你需要的消息。RocketMQ为这些路由信息设置了管理员NameServer。当然也可以有很多NameServer组成一个NameServer集群。至此,你应该知道RocketMQ里面有什么了。包括生产者(Producer)、消费者(Consumer)、NameServer和队列本身(Broker)。Broker是代理的意思,负责队列的访问等操作。我们可以将Broker理解为队列本身。NameServer:我们可以同时部署很多NameServer服务器,这些服务器是无状态的,节点之间没有信息同步。NameServer唤醒后,监听端口,等待Broker、Producer、Consumer连接。NameServer是集群元数据管理中心。Broker:Broker启动,与所有NameServers保持长连接,每30s发送一个心跳包(像心跳一样持续稳定发送请求)。心跳包中包含当前的Broker信息(IP+端口等),存储了所有的Topic信息。注册成功后,在NameServer集群中就有了Topic和Broker的映射关系。我们可以同时部署多个Master和多个Slave。一个Master可以对应多个Slave,但一个Slave只能对应一个Master。Master和Slave需要有相同的BrokerName但不同的BrokerId。BrokerId为0表示Master,非0表示Slave,但只有BrokerId=1的slave服务器才会参与消息读取负载。(Broker的主从作用可以暂时忽略)Topic:在发送和接收消息之前,先创建一个Topic。创建Topic时需要指定Topic存储在哪些Brokers上,也可以在发送消息时自动创建Topic。生产者:生产者发送消息。启动时,首先与其中一个NameServer集群建立长连接,并从NameServer中获取当前发送的Topic存在于哪些Brokers上,使用轮询策略选择一个队列,然后与该队列所在的Broker通信位于。建立长连接,向Broker发送消息。消费者:消费者类似于生产者。与其中一台NameServer建立长连接,获取当前订阅的Topic存在于哪些Broker上,然后直接与Broker建立连接通道开始消费消息。刚才我们提到骑手不止一个,接外卖的不止我一个,所以才会有生产者组合消费者群的概念。这里需要补充一点,消息分为集群消息和广播消息:集群消息:一个topic一个消息,一个consumergroup只能被一个consumer实例消费。比如同一个外卖话题,一个外卖,我们整个社区只有一个人消费,这就是集群消费。广播消息:一个topic的一条消息会被一个consumergroup的所有consumer实例消费。比如政府因为疫情发食品,我们社区的每个人都会去消费,也就是广播消费。元数据管理因为Producer、Consumer、Broker都需要和NameServer打交道,所以三位负责人要先跟大家聊聊NameServer的神圣性。上面提到NameServer是集群的元数据管理中心,那么它管理的是什么元数据呢?下面我们就来看看NameServer都有些什么吧,看完记得关注、转发、点赞、收藏哦。简单来说,NameServer就是我们整个RocketMQ集群的元数据管理中心,负责集群元数据的增删改查。不管这个增删改查是怎么实现的,我们甚至可以理解为是对数据库的增删改查,但是我们必须知道这些元数据长什么样子。为了知道Producer、Consumer和Broker是如何根据这些数据收发消息的。如图所示,两主两从的broker集群相关的元数据信息包括topicQueueTable、BrokerAddrTable、ClusterAddrTable、brokerLiveInfo、FilterServer(暂不关注,图中未展示)。HashMap>topicQueueTable:Key是Topic的名称,里面存放了所有的Topic属性信息。Value是一个QueueData列表,其长度等于此Topic数据中存储的MasterBrokers的数量。QueueData存储Broker名称、读写队列数、同步标志等。HashMapbrokerAddrTable:该结构体存储了一个BrokerName对应的属性信息,包括其所属Cluster的名称,一个MasterBroker和多个SlaveBroker的地址信息HashMap>clusterAddrTable:存放的是集群中Cluster的信息,即一个Cluster名称对应一个由BrokerName组成的集合。HashMapbrokerLiveTable:关键是BrokerAddr对应一台机器,BrokerLiveTable中存储的内容是这台Broker机器的实时状态,包括最后一次更新状态的时间戳。NameServer会定时检查这个时间戳,没有超时更新就认为Broker无效,将其从Broker列表中清除。HashMapFilterServer>filterServerTable:Key是Broker的地址,Value是这个Broker关联的多个FilterServer的地址。FilterServer即过滤服务器,是RocketMQ的一种服务端过滤方式。一个Broker可以有一个或多个FilterServer。其他角色会主动向NameServer上报状态,根据上报消息中的请求码做相应的处理,更新存储的相应信息。Broker收到创建Topic的请求后,向NameServer发送注册信息。NameServer收到注册信息后,首先更新Broker信息,然后以Master的角色为每个Broker创建一个QueueData对象。如果是新的topic,就是添加一个QueueData对象;如果是修改一个topic,就是删除旧的QueueData,增加一个新的QueueData。Broker发送给NameServer的心跳会更新时间戳,NameServer每10秒检查一次校验时间戳。如果检查时间戳超过2分钟,Broker会被认为失效,会触发清理逻辑。连接断开事件也会触发状态更新。当NameServer和Broker的长连接断开时,会调用onChannelDestroy函数清除这个Broker的信息。Producer/Consumer启动后,会与NameServer建立长连接,定时从NameServer获取路由信息并保存在本地。消息的发送和拉取都会用到上面的数据。这么多数据,相信大家都有点头晕了。三位先生简单总结一下:NameServer通过brokerLiveInfo维护存活的Broker。Producer会获取上述路由信息,并指定将消息发送到哪个Topic。根据Topic可以从topicQueueTable中选择一个Broker,根据BrokerName可以从BrokerAddrTable中获取BrokerIP地址。有了地址Producer就可以通过网络将消息传递给Broker。消息收发实例RocketMQ部署我们刚刚了解了RocketMQ的整体架构,那么如何通过RocketMQ收发消息呢?需要通过Docker部署一套RocketMQ:如果没有安装Docker,可以按照菜鸟教程MacOSDocker安装/WindowsDocker安装安装。然后,通过docker-compose部署RocketMQ:克隆docker-middleware仓库,打开RocketMQ目录;修改broker.conf中的brokerIP1参数为本地IP;进入docker-compose.yml文件所在路径,执行docker-composeup命令;注意:如果你现在不了解Docker也没关系,按照步骤部署RocketMQ即可,不妨碍我们对RocketMQ相关内容的理解。部署完成后,我们可以在DockerDashboard中看到RocketMQ相关的容器,包括Broker、NameServer、Console(RocketMQ控制台)。这里我们可以使用部署好的RocketMQ来收发消息。RocketMQ部署完毕,我们来看一个简单的消息收发示例,可以说是RocketMQ的“HelloWorld”。消息发送publicclassSyncProducer{publicstaticvoidmain(String[]args)throwsException{//实例化消息生产者ProducerDefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name");//设置NameServer的地址producer.setNamesrvAddr("localhost:9876");//启动Producer实例producer.start();//创建消息并指定Topic、Tag和消息体Messagemsg=newMessage("Topic1","Tag","Key","Helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET));//向Broker发送消息SendResultsendResult=producer.send(msg);//通过sendResult返回消息是否发送成功System.out.printf("%s%n",sendResult);//如果不再发送消息,则关闭Producer实例。生产者.shutdown();}}首先实例化一个producer生产者,告诉它NameServer的地址,这样producer就可以从NameServer获取路由信息。然后producer要做一些初始化(这是很关键的一步),需要和NameServer通信,需要先建立通信连接。生产者准备好了,所以它要准备发送的内容,发送“HelloWorld”到Topic1。当内容准备好后,生产者就可以发送消息了。生产者如何知道Broker地址?他会去NameServer获取路由信息,得到Broker的地址为localhost:10909,然后通过网络通信将消息发送给Broker。生产者发送的消息通过网络传输给Broker,Broker需要按照一定的结构存储消息。存储完成后,通知生产者存储结果。消息接收publicclassConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{//实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name");//设置NameServer地址consumer.setNamesrvAddr:9876");//订阅一个或多个Topics和Tags来过滤需要消费的消息>msgs,ConsumeConcurrentlyContextcontext){System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);//标记消息已成功消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例consumer.start();}}首先实例化一个consumer消费者,告诉它NameServer的地址,这样consumer就可以从NameServer获取路由信息,然后consumer需要知道自己可以消费哪些Topics的Messages,也就是每个consumer都需要订阅一个或多个主题。消费者也需要做一些初始化。业务本身并不关心如何从Broker中拉取消息。这些都是消费者不为人知的贡献。因此,我们需要启动消费者,消费者会从NameServer中拉取路由信息,并不断从Broker中拉取消息。获取的消息提供给业务定义的MessageListener。消息拉回后,消费应该怎么办?每个消费者都是不同的(由业务自己决定),由我们业务定义的MessageListener来处理。处理完之后,消费者还需要确认收货,也就是告诉Broker消费成功。以上就是本文的全部内容。本文不堆砌太多无意义的概念,不讲调峰解耦和异步通信。网上也有很多这样的内容,看过没看过没什么区别。到最后,懂的点个赞,不懂的存起来,顺便分享给你的朋友。还没有关注的朋友,记得关注,投资股票不会亏本的。参考资料RocketMQ官方文档RocketMQ源码丁伟,周继峰。RocketMQ技术内幕:RocketMQ架构设计与实现原理。机械工业出版社,2019-01.李伟。RocketMQ分布式消息中间件:核心原则和最佳实践。电子工业出版社,2020-08.杨开元。RocketMQ实战与原理分析。机械工业出版社,2018-06.转载请注明出处