当前位置: 首页 > 科技观察

本文将带你熟悉SpringBoot+RabbitMQ方式收发消息

时间:2023-03-22 17:34:14 科技观察

本文将集成SpringBoot,采用自动配置的方式开发。我们只需要声明RabbitMQ的地址即可。关于创建和关闭连接有各种各样的事情。让Spring来帮我们吧~让Spring帮我们管理连接,让我们专注于业务逻辑,和声明式事务一样简单易用、方便高效。祝您收获满满,先赞后观,幸福无限。本文代码:https://gitee.com/he-erduo/spring-boot-learning-demohttps://github.com/he-erduo/spring-boot-learning-demo1。🔍环境配置先从环境的配置说起。在上一篇文章中,我们介绍了自动配置包。既然我们已经使用了自动配置的方式,那么我们可以直接将RabbitMQ的连接信息放在配置文件中,就像我们需要使用JDBC连接时,也配置DataSource一样。如图,我们只需要指定连接IP+端口号和用户名密码即可。这里我使用默认的用户名和密码。主要是看手动确认消息的配置。需要配置为manual,进行手动确认。以后还会有其他的配置项。现在,我们可以配置这个。接下来,我们需要配置一个队列。在上一篇文章中,我们将消息发送到名为erduo的队列。当时,我们手动定义了这个队列。这里我们也需要手动配置,声明一个Bean即可。@ConfigurationpublicclassRabbitmqConfig{@BeanpublicQueuerduo(){//它的三个参数:durableexclusiveautoDelete//一般只设置持久化为returnnewQueue("erduo",true);}}声明就这么简单,当然RabbitMQ毕竟是一个独立的组件。如果你已经在RabbitMQ中通过其他方式创建了一个名为erduo的队列,则无需在此声明。这里的一个作用是,如果你没有这个队列,它会按照你的声明方式帮你创建这个队列。配置好环境后,我们就可以用SpringBoot的方式编写生产者和消费者了。2.📕Producer和RabbitTemplate的节奏跟上一篇一样。先写生产者吧,不过这次要介绍一个新的工具:RabbitTemplate。听名字就知道是另一个即用型工具类。Spring家族在这方面是很自在的。一切都为您打包,让您使用起来更方便、更流畅。RabbitTemplate实现了标准的AmqpTemplate接口,其功能大致可以分为发送消息和接收消息。我们这里在生产者中使用它,主要是利用它的消息发送功能:send和convertAndSend方法。//发送消息到默认的Exchange,使用默认的routingkeyvoidsend(Messagemessage)throwsAmqpException;//使用指定的routingkey发送消息到默认的exchangevoidsend(StringroutingKey,Messagemessage)throwsAmqpException;//使用指定的routingkey发送到指定交换的消息voidsend(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException;send方法是发送字节数组数据的方式,这里代表消息内容的对象是Message对象,它的构造方法是传入字节数组数据,所以我们需要将我们的数据转换成字节数组,然后构造放入消息对象并发送。//对象类型,可以传入POJOvoidconvertAndSend(Objectmessage)throwsAmqpException;voidconvertAndSend(StringroutingKey,Objectmessage)throwsAmqpException;voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage)throwsAmqpException;convertAndSend方法可以传入POJO对象作为参数,verter底层有一个MessageConverter帮助我们自动将数据转换成byte类型或者String或者序列化类型。所以这里只支持三种类型的传入对象:byte类型、String类型和实现了Serializable接口的POJO。介绍完之后,我们可以看一下代码:@Slf4j@Component("rabbitProduce")publicclassRabbitProduce{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsend(){Stringmessage="你好,我是作者耳,欢迎关注我。"+LocalDateTime.now().toString();System.out.println("Messagecontent:"+message);//指定消息类型MessagePropertiesprops=MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();rabbitTemplate.send(Producer.QUEUE_NAME,newMessage(message.getBytes(StandardCharsets.UTF_8),props));System.out.println("消息已发送。");}publicvoidconvertAndSend(){Useruser=newUser();System.out.println("Messagecontent:"+user);rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);System.out.println("消息已发送。");}}这里我具体写两个例子,一个用于测试发送,另一个用于测试convertAndSend。在send方法中,我们可以看到和前面的代码差不多,定义一个消息,然后直接发送,但是这个消息的构造方法可能比我们想象的多了一个参数,我们本来说的是把数据转成二进制数组可以放进去,现在好像还需要多放一个参数。MessageProperties,是的,我们需要添加一个额外的MessageProperties对象。从它的名字我们也可以看出它的作用就是附加一些参数,但是有些参数是不可或缺的,没有它们是无法工作的。比如我这里的代码是设置消息的类型。消息的类型有很多种,可以是二进制类型,文本类型,也可以是序列化类型,JSON类型。我这里设置的是文本类型,需要指定的类型。它可以为我们在获取到消息后要将消息转换成什么样的对象提供一个参考。convertAndSend方法要简单得多。这里我放了一个User对象来测试,只要指定队列,放到这个对象中即可。提示:用户必须实现Serializable接口,否则调用该方法时会抛出IllegalArgumentException。代码完成后,我们就可以调用了。这里我写了一个测试类调用:@SpringBootTestpublicclassRabbitProduceTest{@AutowiredprivateRabbitProducerabbitProduce;@TestpublicvoidsendSimpleMessage(){rabbitProduce.send();rabbitProduce.convertAndSend();}}效果如下图~同时控制使用命令rabbitmqctl.batlist_queues查看queue-erduo的当前情况:这样我们的producer测试就完成了,现在消息队列中有两条消息,而且消息类型肯定是不同的,一条是我们设置的文本类型,一条是自动设置的序列化类型。3.📗Consumer和RabbitListener现在队列中有了消息,我们需要看看如何获??取消息并以新的方式消费和确认消息。这里的Consumers我们需要使用@RabbitListener来帮我们获取指定队列的消息。它的用法很简单也很复杂。可以先说简单的方式,直接放在方法上,指定要监听的队列。@Slf4j@Component("rabbitConsumer")publicclassRabbitConsumer{@RabbitListener(queues=Producer.QUEUE_NAME)publicvoidonMessage(Messagemessage,Channelchannel)throwsException{System.out.println("Messagecontent:"+message);channel.basicAck(messageget.getMessage().getDeliveryTag(),false);System.out.println("Themessagehasbeenconfirmed");}}这段代码表示onMessage方法将处理erduo队列中的消息(Producer.QUEUE_NAME为常量字符串“erduo”)。我们可以看到这个方法中有两个参数,Message和Channel。如果不使用Channel,这个参数可以省略,但是必须要有Message消息,代表消息本身。我们可以想一想,我们的程序从RabbitMQ中拉回消息后,会如何展示给我们呢?没错,它被封装成了Message对象,里面包含了一条消息的所有信息,我跑一会就知道数据结构是什么样的了。同时,这里我们使用Channel来进行消息确认操作。这里的DeliveryTag代表消息在队列中的序号,这个信息保存在MessageProperties中。4.📖SpringBoot启动!写好生产者和消费者,并运行生产者将两条信息放入消息队列后,我们就可以直接启动消息,查看消费情况:在红框标注的地方可以看到,因为我们在那里都是消费者,所以项目启动后,与RabbitMQ建立连接,监控队列。然后我们开始消费我们队列中的两条消息:第一条消息是contentType=text/plain类型的,所以直接在控制台打印具体内容。第二条消息是contentType=application/x-java-serialized-object,打印的时候只打印一个内存地址+字节大小。不管怎样,我们都拿到了数据,说明我们的消费没有问题,我们也进行了消息确认操作。从数据上看,整个消息可以分为body和MessageProperties两部分。我们可以单独使用注解来获取这个主体的内容-@Payload@RabbitListener(queues=Producer.QUEUE_NAME)publicvoidonMessage(@PayloadStringbody,Channelchannel)throwsException{System.out.println("Messagecontent:"+body);}也可以使用单个注解获取MessageProperties的headers属性,截图中也可以看到headers属性,但是是空的——@Headers。@RabbitListener(queues=Producer.QUEUE_NAME)publicvoidonMessage(@PayloadStringbody,@HeadersMapheaders)throwsException{System.out.println("Messagecontent:"+body);System.out.println("Messageheaders:"+headers);}这两个注解算得上是扩展知识了,我还是比较喜欢直接全部获取,全部!!!以上我们就完成了消息的发送和消费。我们可以再次回忆整个过程。一切都按照我画的图一样的轨迹:只是我们没有指定Exchage一直使用的默认路由。我希望你能记住它。活这张照片。5.📘@RabbitListener和@RabbitHandler接下来补充一些关于@RabbitListener和@RabbitHandler的知识点。@RabbitListener上面我们简单的使用了它,稍微扩展一下它其实可以监听多个队列,像这样:@RabbitListener(queues={"queue1","queue2"})publicvoidonMessage(Messagemessage,Channelchannel)throwsException{System.out.println("Messagecontent:"+message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)System.out.println("Messagecontent:"+message);}以及其他一些功能,例如如绑定等,这里不再赘述,因为它们过于硬编码,一般不会使用。说说本节的一个主要特点:@RabbitListener和@RabbitHandler的组合。前面我们并没有提到@RabbitListener注解其实是可以注解在类上的。该注解标记该类监听某个队列或某些队列。这两个注解的组合需要在类上注解@RabbitListener,然后在方法上注解@RabbitHandler,根据方法参数自动识别和消费。写个例子给大家看看比较直观。@Slf4j@Component("rabbitConsumer")@RabbitListener(queues=Producer.QUEUE_NAME)publicclassRabbitConsumer{@RabbitHandlerpublicvoidonMessage(@PayloadStringmessage){System.out.println("Messagecontent:"+message);}@RabbitHandlerpublicvoidonMessage(@SystemPayload){.out.println("Messagecontent:"+user);}}可以看看这个例子,我们先使用@RabbitListener来监听erduo队列中的消息,然后使用@RabbitHandler注解两个方法。第一个方法的主体类型是String类型,也就是说这个方法只能处理文本类型的消息。第二个方法的主体类型是User类型,也就是说这个方法只能处理序列化类型和User类型的消息。这两个方法对应的是我们的测试类在第二节要发送的两条消息,所以我们发送两条测试消息给RabbitMQ来测试这段代码,看看效果:两者都像往常一样打印在控制台上现在,如果没有@RabbitHandler注解的方法可以匹配你消息的类型,比如消息都是byte数组类型,没有对应的方法接收,系统会继续在控制台报错,如果出现这种情况你,证明你的类型写错了。假设你的erduo队列中会存在三种类型的消息:byte、text和serialization,那么你必须有相应的方法来处理这三种消息,否则消息发送的时候会因为无法正确转换错误而失败.而且在使用了@RabbitHandler注解之后,就不能再像以前一样使用Message作为接收类型了。@RabbitHandlerpublicvoidonMessage(Messagemessage,Channelchannel)throwsException{System.out.println("Messagecontent:"+message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息确认");}这样写会报类型转换异常,所以两者选其一。同时,我上面的@RabbitHandler也没有确认消息。您可以自己尝试确认消息。6.📙消息的序列化转换从上面我们已经知道,可以自动转换的对象只有byte[]、String和java序列化对象(实现了Serializable接口的对象),但并不是所有的Java对象都会实现Serializable接口,并且序列化过程使用了JDK自带的序列化方式,效率低下。所以我们比较常见的做法是:使用Jackson将数据转成JSON格式发送给RabbitMQ,然后在接收消息时使用Jackson反序列化数据。这样可以完美解决上面的痛点:消息对象不需要实现Serializable接口,而且效率也比较高(Jackson序列化效率应该是业界最好的)。默认的消息转换方案是消息转换顶层接口-MessageConverter的子类:SimpleMessageConverter。如果我们要切换到另一个消息转换器,我们只需要更换这个转换器。上图是MessageConverter结构树的结构树。可以看到除了SimpleMessageConverter之外还有一个Jackson2JsonMessageConverter。我们只需要将它定义为一个Bean就可以直接使用这个转换器了。@BeanpublicMessageConverterjackson2JsonMessageConverter(){returnnewJackson2JsonMessageConverter(jacksonObjectMapper);}这样就可以了,这里的jacksonObjectMapper是不能传入的,但是默认的ObjectMapper方案对JDK8的时间日期序列化不是很友好,具体可以参考我的上一篇:从LocalDateTime序列化讨论全局一致性序列化,总的来说,定义自己的ObjectMapper。同时,为了下次测试方便,我定义了一个专门测试JSON序列化的队列:@BeanpublicQueuerduoJson(){//它的三个参数:durableexclusiveautoDelete//一般设置持久化为returnnewQueue("erduo_json",true);}在此之后,您可以首先测试生产者代码:);System.out.println("Themessagehasbeensent.");}我重新定义了一个Client对象,和前面测试用到的User对象成员变量一样,不同的是没有实现可序列化的接口。同时,为了保留之前的测试代码,我新建了一个RabbitJsonConsumer来测试JSON序列化相关的消费代码,它定义了一个静态变量:JSON_QUEUE="erduo_json";所以这段代码将Client对象作为消息发送到“erduo_json”队列中,然后我们在测试类中运行它发送一次。接下来是消费者代码:@Slf4j@Component("rabbitJsonConsumer")@RabbitListener(queues=RabbitJsonConsumer.JSON_QUEUE)println("Messagecontent:"+client);System.out.println("Messageheaders:"+headers);channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);System.out.println("消息已经确认”);}}经过上面的体验,这段代码非常简单易懂,同时也给出了上一节没有写的@RabbitHandler模??式下signfor发送消息的方式。直接看效果:在打印出来的Headers中,回头可以看到contentType=application/json,这个contentType表示消息的类型,这就说明我们新的消息转换器生效了,convert所有的消息都被转换了为JSON类型。后记Thes两篇文章已经完成了RabbitMQ消息的基本发送和接收,包括手动配置和自动配置两种方式。仔细看完这些,你应该不会对用RabbitMQ收发消息产生疑惑了~不过我们发送消息的时候一直都是默认的。开关,下篇文章将介绍RabbitMQ的几种开关类型以及使用方法。

最新推荐
猜你喜欢