当前位置: 首页 > 后端技术 > Java

一图进阶RocketMQ——整体架构

时间:2023-04-01 23:36:22 Java

前言三、看了好几本书,看了很多源码一图进阶RocketMQ图片链接,关于RocketMQ的你只需要记住这张图!如果您是第一次看到这个系列,墙裂建议您打开链接。觉得不错记得点赞关注哦。视频在B站同步更新(B站:三君子),欢迎看图进阶ROcketMQ——整体架构(视频版),记得调整视频分辨率~https://www.bilibili.com/视频...本文是《一张图进阶 RocketMQ》系列的第一篇。今天的内容主要分为两个部分:整体架构:RocketMQ的完整架构将从大家熟悉的“生产者消费者模型”开始逐步介绍。你只需要记住一张完整的架构图就够了。消息收发示例:通过Docker部署RocketMQ,用简单的例子连接RocketMQ的消息收发流程。总体架构什么是消息队列?顾名思义,首先要有一个队列,用来存储消息。然后就是消息队列,有人放进去,有人拿进去。有没有似曾相识的感觉,这就是连小学生都知道的经典“生产者消费者模型”吗?接下来我们就来看看它里面穿的是什么?别着急,我们先来重温一下“生产者消费者模型”的老朋友。简单来说,这个模型由两种线程和一个队列组成:生产者线程:生产产品,放入队列。消费者线程:从队列中获取产品并消费。有了这个队列,生产者只需要关注生产,不需要关心消费者的消费行为,更不需要等待消费者线程执行完毕;消费者只需要消费,不管生产者如何生产,更不用说等待生产者生产了。这意味着生产者和消费者是解耦和异步的。这是古老而强大的。在以后的工作和学习中,你会不断遇到这些概念,但越看越看不懂。我们的很多生活都是异步的。比如最近新冠疫情卷土重来。我点的外卖只能送到小区门口的外卖排队处,我也只能到外卖排队处取外卖,然后狼吞虎咽。如何实现“生产者消费者模型”?我想每个上小学的人都学过。让我们看看这个模型存在哪些问题。最大的问题是我们小学时学的“生产者消费者模型”是单机版的,只能自己打气了。这就相当于,我是一个外卖骑手,我点了一份外卖放到外卖队列,然后我从外卖队列中取走,操作猛如虎!于是就有了进化版。我们把消费者、队列和生产者放在不同的服务器上。这就是传说中的分布式消息队列。生产者生产的消息通过网络传递到队列存储,消费者通过网络从队列中获取消息。但是还有一个问题。新闻可能有很多种。把它们都放在一起不是很乱吗?我点的外卖和快递都是放在一起的,太难找了。所以我们需要区分不同类型的消息,相同类型的消息称为一个Topic。同时骑手不可能只有一个,点餐也不会只有我一个,所以有生产者群体,也有消费者群体。但是还有一个问题。社区太大了,一个队列放不下。我住在小区南门,点了外卖还要到北门去取,真是蛋疼。因此,物业在东南门和西北门分别设置了外卖快递点。即我们有多个队列组成一个队列集群。然而,问题又来了(没完没了),一个小区派送的队列那么多,骑手怎么知道去哪里派送,我又怎么知道去哪里取货?很简单,导航。我们称导航信息为路由信息。这些信息需要管理。它告诉生产者一个主题消息可以发送到哪些队列,同时告诉消费者可以从哪些队列中取出你需要的消息。RocketMQ为这些路由信息设置了管理员NameServer。当然也可以有很多NameServer组成一个NameServer集群。至此,你应该知道RocketMQ里面有什么了。包括生产者(Producer)、消费者(Consumer)、NameServer和队列本身(Broker)。Broker是代理的意思,负责队列的访问等操作。我们可以将Broker理解为队列本身。NameServer:我们可以同时部署很多NameServer服务器,这些服务器是无状态的,节点之间没有信息同步。NameServer唤醒后,监听端口,等待Broker、Producer、Consumer连接。NameServer是集群元数据管理中心。Broker:Broker启动,与所有NameServers保持长连接,每30s发送一个心跳包(像心跳一样持续稳定发送请求)。心跳包中包含当前的Broker信息(IP+端口等),存储了所有的Topic信息。注册成功后,在NameServer集群中就有了Topic和Broker的映射关系。我们可以同时部署多个MasterBroker和多个SlaveBroker。一个Master可以对应多个Slave,但一个Slave只能对应一个Master。Master和Slave需要有相同的BrokerName但不同的BrokerId。BrokerId为0表示Master,非0表示Slave,但只有BrokerId=1的slave服务器才会参与消息读取负载。(Broker的主从作用可以暂时忽略)Topic:在发送和接收消息之前,先创建一个Topic。创建Topic时需要指定Topic存储在哪些Brokers上,也可以在发送消息时自动创建Topic。生产者:生产者发送消息。启动时,首先与其中一个NameServer集群建立长连接,并从NameServer中获取当前发送的Topic存在于哪些Brokers上,使用轮询策略选择一个队列,然后与该队列所在的Broker通信位于。建立长连接,向Broker发送消息。消费者:消费者类似于生产者。与其中一台NameServer建立长连接,获取当前订阅的Topic存在于哪些Broker上,然后直接与Broker建立连接通道开始消费消息。刚才我们提到骑手不止一个,接外卖的不只我一个,所以就会有生产者群和消费者群的概念。这里需要补充一点,消息分为集群消息和广播消息:集群消息:一个topic一个消息,一个consumergroup只能被一个consumer实例消费。比如同一个外卖话题,一个外卖,我们整个社区只有一个人消费,这就是集群消费。广播消息:一个topic的一条消息会被一个consumergroup的所有consumer实例消费。比如政府因为疫情发食品,我们社区的每个人都会去消费,也就是广播消费。消息收发实例RocketMQ部署我们刚刚了解了RocketMQ的整体架构,那么如何通过RocketMQ收发消息呢?需要通过Docker部署一套RocketMQ:如果没有安装Docker,可以按照菜鸟教程MacOSDocker安装/WindowsDocker安装安装。然后,通过docker-compose部署RocketMQ:clonedocker-middleware仓库,打开dockers目录;修改./conf/broker.conf中的brokerIP1参数为本地IP;进入docker-compose.yml文件所在路径,执行docker-composeup命令即可;注意:如果你现在不了解Docker也没关系,按照步骤部署RocketMQ即可,不妨碍我们对RocketMQ相关内容的理解。部署完成后,我们可以在DockerDashboard中看到RocketMQ相关的容器,包括Broker、NameServer、Console(RocketMQ控制台)。这里我们可以使用部署好的RocketMQ来收发消息。RocketMQ部署完毕,我们来看一个简单的消息收发示例,可以说是RocketMQ的“HelloWorld”。消息发送publicclassSyncProducer{publicstaticvoidmain(String[]args)throwsException{//实例化消息生产者ProducerDefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name");//设置NameServer的地址producer.setNamesrvAddr("localhost:9876");//启动Producer实例producer.start();//创建消息并指定Topic、Tag和消息体Messagemsg=newMessage("Topic1","Tag","Key","Helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET));//向Broker发送消息SendResultsendResult=producer.send(msg);//通过sendResult返回消息是否发送成功System.out.printf("%s%n",sendResult);//如果不再发送消息,则关闭Producer实例。生产者.shutdown();}}首先实例化一个producer生产者,告诉它NameServer的地址,这样producer就可以从NameServer获取路由信息。然后producer要做一些初始化(这是很关键的一步),需要和NameServer通信,需要先建立通信连接。生产者准备好了,所以它要准备发送的内容,发送“HelloWorld”到Topic1。当内容准备好后,生产者就可以发送消息了。生产者如何知道Broker地址?他会去NameServer获取路由信息,得到Broker的地址为localhost:10909,然后通过网络通信将消息发送给Broker。生产者发送的消息通过网络传输给Broker,Broker需要按照一定的结构存储消息。存储完成后,通知生产者存储结果。消息接收publicclassConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{//实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name");//设置NameServer地址consumer.setNamesrvAddr:9876");//订阅一个或多个Topics和Tags来过滤需要消费的消息>msgs,ConsumeConcurrentlyContextcontext){System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);//标记消息已成功消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例consumer.start();}}首先实例化一个consumer消费者,告诉它NameServer的地址,这样consumer就可以从NameServer获取路由信息,然后consumer需要知道自己可以消费哪些Topics的Messages,也就是每个consumer都需要订阅一个或多个主题。消费者也需要做一些初始化。业务本身并不关心如何从Broker中拉取消息。这些都是消费者不为人知的贡献。因此,我们需要启动消费者,消费者会从NameServer中拉取路由信息,并不断从Broker中拉取消息。获取的消息提供给业务定义的MessageListener。消息被拉回后,消费者该如何处理?每个消费者都是不同的(由业务自己决定),由我们业务定义的MessageListener来处理。处理完之后,消费者还需要确认收货,也就是告诉Broker消费成功。以上就是本文的全部内容。本文不堆砌太多无意义的概念,不讲调峰解耦和异步通信。网上也有很多这样的内容,看过没看过没什么区别。最后,懂的点赞,不懂的救。来这里交个朋友,遇到什么问题都可以和三位大师分享。参考资料RocketMQ官方文档RocketMQ源码丁伟,周继峰。RocketMQ技术内幕:RocketMQ架构设计与实现原理。机械工业出版社,2019-01.李伟。RocketMQ分布式消息中间件:核心原则和最佳实践。电子工业出版社,2020-08.杨开元。RocketMQ实战与原理分析。机械工业出版社,2018-06.转载请注明出处