当前位置: 首页 > 后端技术 > Java

RocketMQ保姆级教程

时间:2023-04-02 00:10:25 Java

上周花了一点时间从头到尾,从零搭建了一个RocketMQ环境。我觉得挺容易的,所以写了一篇文章分享给大家。整篇文章大致可以分为三个部分。第一部分属于一些核心概念和工作流程的解释;第二部分是手动搭建一套环境;第三部分是根据环境测试并集成到SpringBoot中。过程比较详细,所以我称之为“保姆级教程”。好了废话不多说,直接进入正题。前言RocketMQ是阿里巴巴旗下开源的MQ框架。经历了双十一和Java编程语言实现的考验,有着非常完善的生态体系。RocketMQ作为一个纯java、分布式、队列模型的开源消息中间件,支持事务性消息、顺序消息、批量消息、定时消息、消息回溯等,简而言之就是葛大爷一句话NameServer的核心理念:Itcan可以理解为是一个注册中心,主要用于保存主题路由信息和管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有通信的。Broker:核心角色,主要用于保存主题信息,接受生产者产生的消息,持久化消息。在Broker集群中,同一个BrokerName可以称为一个Broker组。在一个Broker组中,BrokerId为0为主节点,其他为从节点。BrokerName和BrokerId可以在Broker启动时通过配置文件进行配置。每个Broker组只存储部分消息。生产者:生产消息的一方是生产者。生产者组:一个生产者组可以有多个生产者。只需要在创建生产者的时候指定生产者组,那么生产者就会在那个生产者组中consumer:一个消费者组,用来消费生产者消息:和生产者一样,每个消费者都有一个消费者组。一个消费者组可以有很多消费者。不同的消费者群体消费消息的方式不同。做作的。topic(主题):可以理解为消息集合的名称。当生产者发送消息时,他需要指定将消息发送到哪个主题。消费者在消费消息时,还需要知道自己消费的是哪些主题。Tag(subtopic):比topic低一级,可用于区分同一topic下不同业务类型的消息。发送消息时也需要指定。这里组的概念是因为它可以用来对不同的生产者组或者消费者组进行不同的配置,可以让生产者或者消费者更加灵活。工作流说完核心概念,我们再来说说核心工作流。这里我先画一张图。从这张图可以清楚的知道RocketMQ的大致工作流程:Broker启动时,会向各个NameServer注册自己的信息(因为NameServer之间不通信,所以要各自注册),信息包括你的自己的ip和端口号,自己的Broker有什么topic等等信息。Producer启动后,会与NameServer建立连接,并定期从NameServer获取Broker信息。发送消息时,会根据消息需要发送到哪个topic找到对应的Broker地址。如果有,就向这个Broker发送请求;如果没有找到,则根据是否允许自动创建主题来决定是否发送消息。Broker收到Producer的消息后,会保存消息并持久化。如果有从节点,也会主动同步到从节点,实现数据备份。获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应Broker的地址,然后和Broker建立连接,获取消息,消费就像上图一样.整体工作流程相对简单,这里我简化了很多概念,主要是为了更好的理解。环境搭建终于说完了一些简单的概念,接下来我们来搭建一个RocketMQ环境。通过上面的分析,我们知道RocketMQ中有四种角色:NameServer、Broker、producer、consumer。生产者和消费者其实就是业务系统,这里就不用建了。真正要构建的是NameServer和Broker。但是为了方便RocketMQ数据的可视化,我这里额外搭建了一套可视化服务。施工过程比较简单,按步骤一步步完成即可。如果有些命令不存在,直接通过yum安装这些命令即可。一、准备工作需要准备一台linux服务器,需要先安装JDK,关闭防火墙systemctlstopfirewalldsystemctldisablefirewalld复制代码下载并解压RocketMQ1,创建目录存放rocketmq相关的东西mkdir/usr/rocketmqcd/usr/rocketmq复制代码2.下载解压rocketmq下载wgethttps://archive.apache.org/di...复制代码解压unziprocketmq-all-4.7.1-bin-release.zip复制代码看到这个文件夹tocomplete然后进入rocketmq-all-4.7.1-bin-release文件夹cdrocketmq-all-4.7.1-bin-releasecopycodeRocketMQstuffishere2.搭建NameServer并修改jvm参数在启动NameServer之前,强建议在启动时修改jvm参数,因为默认的参数比较大,为了避免内存不足,建议修改小一些,当然如果你的内存足够大,可以忽略。vibin/runserver.sh复制代码,修改圈出的那一行这里可以直接修改成和我的一样-server-Xms512m-Xmx512m-Xmn256m-XX:MetaspaceSize=32m-XX:MaxMetaspaceSize=50m复制代码和启动NameServer修改后,执行以下命令启动NameServernohupshbin/mqnamesrv&复制代码查看NameServer日志tail-f~/logs/rocketmqlogs/namesrv.log复制代码如果看到如下日志,说明NameServer启动成功Log3,BuildBroker这里启动单机版Broker。修改jvm参数和启动NameServer是一样的。也建议修改jvm参数vibin/runbroker.sh把代码复制过来,把画圈的地方改小一点。和我设置的一样-server-Xms1g-Xmx1g-Xmn512m复制代码修改Broker配置文件broker.conf这里需要更改Broker配置文件,需要指定NameServer的地址,因为Broker需要注册用NameServerviconf/broker.confcopyCodeBroker配置文件Broker配置文件这里可以看到Broker的配置,Broker集群的名称,Broker的名称,Broker的id,所有这些符合以上。在文件末尾添加地址namesrvAddr=localhost:9876复制代码,因为NameServer和Broker在同一台机器上,所以是localhost,nameserver默认端口是9876。不过这里我也建议多修改一个一条信息,因为Broker向NameServer注册时,如果不指定会自动获取带进来的ip,但是自动获取有一个坑,就是你的电脑可能无法访问这个ip是自动获取的,所以我建议手动指定你的电脑可以访问的服务器ip。我的虚拟机的ip是192.168.200.143,所以我指定为192.168.200.143,如下:brokerIP1=192.168.200.143brokerIP2=192.168.200.143复制代码如果以上都配置好了,最终的配置文件应该如下,红圈是新增的startBrokernohupshbin/mqbroker-cconf/broker.conf©code-c参数是指定配置文件查看日志tail-f~/logs/rocketmqlogs/broker.logcopycode当看到如下日志,则表示启动成功4.搭建可视化控制台其实前面的NameServer和Broker搭建完成后,就可以用来收发消息了,但是为了更加直观,可以构建一套可视化服务。可视化服务其实就是一个jar包,启动即可。jar包可以从这个链接获取:pan.baidu.com/s/16s1qwY2q...提取码:s0sd将jar包上传到服务器,放在/usr/rocketmq目录下。当然,你把它放在哪里并不重要。这只是为了方便。因为rocketmq的一切都在这里,进入/usr/rocketmq,执行如下命名nohupjava-jar-server-Xms256m-Xmx256m-Drocketmq.config.namesrvAddr=localhost:9876-Dserver.port=8088rocketmq-console-ng-1.0.1.jar&复制代码rocketmq.config.namesrvAddr用于指定NameServer查看日志的地址tail-f~/logs/consolelogs/rocketmq-console.log复制代码当看到如下日志时,说明启动成功然后在浏览器输入http://linuxserverip:8088/就可以看到控制台了。如果不能访问,可以检查防火墙是否关闭。您可以在右上角将语言切换为中文。Broker集群信息主题信息通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。功能很多,这里就不一一介绍了。测试环境搭建完成后,就可以进行测试了。引入依赖org.apache.rocketmqrocketmq-client4.7.1copycodeproducersendmessagepublicclassProducer{publicstaticvoidmain(String[]args)throwsException{//创建一个生产者,指定生产者组为sanyouProducerDefaultMQProducerproducer=newDefaultMQProducer("sanyouProducer");//指定NameServer的地址producer.setNamesrvAddr("192.168.200.143:9876");//第一次发送可能会超时,我设置大了producer.setSendMsgTimeout(60000);//启动生产者producer.start();//为Tanys创建消息//topic//消息内容为三友的java日记//标签为TagAMessagemsg=newMessage("sanyouTopic","TagA","Sanyou'sjavadiary".getBytes(RemotingHelper.默认字符集);//发送并获取消息发送结果,然后打印SendResultsendResult=producer.send(msg);System.out.printf("%s%n",sendResult);//关闭生产者producer.关机();}}复制代码构建消息生产者DefaultMQProducer实例,然后指定生产者组为sanyouProducer;指定NameServer的地址:服务器的ip:9876,因为需要从NameServer中拉取Broker信息producer.start()启动producer构建一条消息,内容为三友的java日记,然后指定这条消息发送到topicsanyouTopicproducer.send(msg):发送消息,打印结果并关闭producer。TherunningresultisasfollowsSendResult[sendStatus=SEND_OK,msgId=C0A81FAF54F818B4AAC2475FD2010000,?offsetMsgId=C0A8C88F00002A9F000000000009AE55,?messageQueue=MessageQueue?[topic=sanyouTopic,?brokerName=broker-a,?queueId=0],?queueOffset=0]复制代码sendStatus=SEND_OK说明发送成功了,此时就可以后台查看到未消费到控制台查看消息,然后选择要发送的主题,手动选择查询的时间范围,如果不选择,可以找出来(我怀疑这是个bug),然后查询就可以看到消息了。然后单击MESSAGEDETAIL以查看详细信息。您可以在此处查看已发送消息的详细信息。左下角消息的消费,因为我们没有消费者订阅这个主题,所以左下角没有数据。消费者消费消息public?class?Consumer?{????public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{????????//?通过push模式消费消息,指定消费者组????????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("sanyouConsumer");????????//?指定NameServerAddressconsumer.setNamesrvAddr("192.168.200.143:9876");//Subscribetoallnewsunderthistopicconsumer.subscribe("sanyouTopic","*");//Registeraconsumerlistener,whenthereisnews的时候,会回调这个监听器来消费消息????????consumer.registerMessageListener(new?MessageListenerConcurrently()?{????????????@Override????????????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,????????????????????????????????????????????????????????????ConsumeConcurrentlyContext?context)?{????????????????for?(MessageExt?msg?:?msgs)?{????????????????????System.out.printf("Consumemessage:%s",newString(msg.getBody())+"\n");}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}});//启动消费者consumer.start();System.out.printf("消费者开始.%n");}}复制代码创建consumer实例对象,指定消费组为sanyouConsumer指定NameServer的地址:服务器ip:9876订阅sanyouTopic的所有信息consumer.registerMessageListener,这个很重要,就是注册一个监听器,这个监听器会在有消息的时候回调这个监听器,对消息进行处理,所以用户需要实现这个接口,然后Processingmessagestartconsumer启动后,consumer会消费producer刚才发送的消息,因此控制台将打印出以下消息ConsumerStarted。消费消息:三友的java日记复制代码现在去控制台,发现是sanyouConsumer这个消费组被消费了。SpringBoot环境下集成的RocketMQ集成肯定不会像上面测试的那样在实际项目中使用,都是用SpringBoot集成的。1.引入依赖org.apache.rocketmqrocketmq-spring-boot-starter2.1.1org.springframework.bootspring-boot-starter-test2.1.1.RELEASE复制代码2,yml配置rocketmq:producer:group:sanyouProducername-server:192.168.200.143:9876复制代码3.SpringBoot下创建消费者只需要实现RocketMQListener接口,然后添加@RocketMQMessageListener注解@Component@RocketMQMessageListener(consumerGroup="sanyouConsumer",topic="sanyouTopic")publicclassSanYouTopicListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmsg){System.out.println("处理消息:"+msg);}}复制代码@RocketMQMessageListener需要指定哪个消费者属于Consumer组,消费哪个主题,NameServer的地址已经通过yml配置文件配置好了4.测试@SpringBootTest(classes=RocketMQApplication.class)@RunWith(SpringRunner.class)publicclassRocketMQTest{@AutowiredprivateRocketMQTemplate模板;@Testpublicvoidsend()throwsInterruptedException{template.convertAndSend("sanyouTopic",'三友的java日记');线。睡眠(60000);}}复制代码直接注入一个RocketMQTemplate,然后通过RocketMQTemplate发送消息。运行结果如下:处理消息:三友java日记的复制代码确实消费了消息。原理其实是一样的,只是在SpringBoot中封装了一层,方便使用。1、RocketMQTemplate构造代码从这里可以看出,在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以可以想象,RocketMQTemplate最终发送的消息也是通过DefaultMQProducer发送的。2、@RocketMQMessageListener注解处理从这里可以看出,每一个注解了@RocketMQMessageListener的对象都会创建一个DefaultMQPushConsumer,所以消息最终是通过DefaultMQPushConsumer消费的。至于监听器,就是在这里遍历每条消息,然后调用handleMessage,最后调用实现了RocketMQListener的对象来处理消息。最后,通过以上的理论介绍和实际环境搭建加上代码测试,相信大家应该可以上手RocketMQ了。有兴趣的小伙伴可以手动设置。如果整个过程顺利,可能需要十到二十分钟。.最后,再说一件事。从整篇文章可以看出,本文并没有涉及到一些机制和原理的太深入的讲解,比如消息是如何存储的,事务和延迟消息是如何实现的,主从同步是如何实现的等等,我压根就没提队列这个词,主要是打算后面写一篇文章,分别分析这些机制和原理。最后附上本文所有代码地址:github.com/sanyou3/roc...