当前位置: 首页 > 科技观察

SpringBoot+RabbitMQ实现RPC调用

时间:2023-03-12 04:53:44 科技观察

说到RPC(RemoteProcedureCallProtocol远程过程调用协议),小伙伴们脑海中浮现的估计有RESTfulAPI、Dubbo、WebService、JavaRMI、CORBA等。其实RabbitMQ也为我们提供了RPC功能,使用起来非常简单。今天宋哥就通过一个简单的案例和大家分享一下SpringBoot+RabbitMQ是如何实现一个简单的RPC调用的。注意,有些小伙伴可能对RabbitMQ实现RPC调用有一些误解,认为这不简单?创建两个消息队列queue_1和queue_2。首先,client向queue_1发送消息,server在queue_1上监听消息。收到后,进行处理;处理完成后,服务端向queue_2队列发送消息,然后客户端监听queue_2队列上的消息,这样就知道服务端的处理结果了。这个方法也不是不行,就是有点麻烦!RabbitMQ提供了现成的解决方案,可以直接使用,非常方便。接下来我们一起学习吧。1.架构首先我们来看一个简单的架构图:这个图把问题说的很清楚了:首先,Client发送消息。与普通消息相比,这条消息多了两个关键内容:一是correlation_id,表示这条消息的唯一id,内容是reply_to,表示消息回复队列的名称。服务器从消息发送队列中获取消息,并处理相应的业务逻辑。处理完成后,将处理结果发送到reply_to指定的回调队列。客户端从回调队列中读取消息以了解消息的执行情况。这种情况其实很适合处理异步调用。2.实践接下来我们通过一个具体的例子来看看这是如何实现的。2.1客户端开发首先,我们创建一个名为producer的SpringBoot项目。作为消息生产者,在创建时添加web和rabbitmq依赖,如下图:项目创建成功后,首先在application.properties中配置RabbitMQ的基本信息,如下:spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true这个配置的前四行很容易理解,所以我不会详细介绍。接下来两行:首先,配置消息确认方式。我们通过相关确认。只有启用此配置后,correlation_id才会包含在以后的消息中。只有通过correlation_id才能发送消息和链接返回值。最后一行配置是开启发送失败返回。接下来我们提供一个配置类,如下:www.javaboy.org*@微信a_java_boy*@GitHubhttps://github.com/lenve*@Giteehttps://gitee.com/lenve*/@ConfigurationpublicclassRabbitConfig{publicstaticfinalStringRPC_QUEUE1="queue_1";publicstaticfinalStringRPC_QUEUE2="queue_2";publicstaticfinalStringRPC_EXCHANGE="rpc_exchange";/***设置消息发送RPC队列*/@BeanQueuemsgQueue(){returnnewQueue(RPC_QUEUE1);}/***设置返回队列*/@BeanQueuereplyQueue(){returnnewQueue(RPC_QUEUE2);}/***设置交换*/@BeanTopicExchangeexchange(){returnnewTopicExchange(RPC_EXCHANGE);}/***请求队列和交换绑定*/@BeanBindingmsgBinding(){returnBindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);}/***返回队列和交换绑定*/@BeanBindingreplyBinding(){returnBindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);}/***使用RabbitTemplate发送接收消息*并设置回调队列地址s*/@BeanRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplatetemplate=newRabbitTemplate(connectionFactory);template.setReplyAddress(RPC_QUEUE2);template.setReplyTimeout(6000);returntemplate;}/***设置返回队列的监听器*/@BeanSimpleMessageListenerContainerreplyContainer(ConnectionFactoryconnectionFactory){SimpleMessageListenerContainerContainercontainer=newSimpleMessengactory(Listen)connectionFactory);container.setQueueNames(RPC_QUEUE2);container.setMessageListener(rabbitTemplate(connectionFactory));返回容器;绑定开关是RabbitMQ的常规操作,所以没什么好说的。在SpringBoot中,我们负责发送消息的工具是RabbitTemplate。默认情况下,系统会自动提供这个工具,但是这里我们需要重新自定义这个工具,主要是添加消息发送的返回队列,最后我们需要给返回队列设置一个监听器。好了,那我们就可以开始发具体消息了:javaboy.org*@微信a_java_boy*@GitHubhttps://github.com/lenve*@Giteehttps://gitee.com/lenve*/@RestControllerpublicclassRpcClientController{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(RpcClientController.class);@AutowiredprivateRabbitTemplaterabbitTemplate;@GetMapping("/send")publicStringsend(Stringmessage){//创建消息对象MessagenewMessage=MessageBuilder.withBody(message.getBytes()).build();logger.info("clientsend:{}",newMessage);//客户端发送消息Messageresult=rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE,RabbitConfig.RPC_QUEUE1,newMessage);Stringresponse="";if(result!=null){//获取发送消息的correlationIdStringcorrelationId=newMessage。getMessageProperties().getCorrelationId();logger.info("correlationId:{}",correlationId);//获取响应头信息HashMapheaders=(HashMap)result.getMessageProperties().getHeaders();//获取服务器返回的消息idStringmsgId=(String)headers.get("spring_returned_message_correlation");if(msgId.equals(correlationId)){response=newString(result.getBody());logger.info("clientreceive:{}",response);}}returnresponse;}}这一块的代码其实就是一些常规的代码,我挑几个关键节点说一下:消息发送调用sendAndReceive方法,这个方法自带返回值,返回值是服务器返回的消息在服务器返回的消息中,header信息中包含spring_returned_message_correlation字段,即消息发送时的correlation_id,消息发送时的correlation_id,以及返回消息头值中的spring_returned_message_correlation字段,我们可以将返回的消息内容和发送的消息进行绑定,确认返回的内容是针对发送的消息。这是整个客户端的开发。其实核心就是sendAndReceive方法的调用。虽然调用简单,但准备工作还是很充分的。比如我们不在application.properties中配置correlation,那么发送的消息中就没有correlation_id,这样返回的消息内容就无法与发送的消息内容相关联。2.2服务端开发我们再来看看服务端开发。首先,创建一个名为consumer的SpringBoot项目。项目中添加的依赖与客户端开发创建的依赖一致,这里不再赘述。然后配置application.properties配置文件。该文件的配置也与客户端中的配置一致,这里不再赘述。接下来,提供了一个RabbitMQ配置类。这个配置类比较简单。简单的配置消息队列,绑定到消息开关,如下:本站http://www.javaboy.org*@微信a_java_boy*@GitHubhttps://github.com/lenve*@Giteehttps://gitee。com/lenve*/@ConfigurationpublicclassRabbitConfig{publicstaticfinalStringRPC_QUEUE1="queue_1";publicstaticfinalStringRPC_QUEUE2="queue_2";publicstaticfinalStringRPC_EXCHANGE="rpc_exchange";/***配置消息发送队列*/@BeanQueuemsgQueue(){returnnew/QUEUE1(RPC)*设置返回queue*/@BeanQueuereplyQueue(){returnnewQueue(RPC_QUEUE2);}/***设置exchange*/@BeanTopicExchangeexchange(){returnnewTopicExchange(RPC_EXCHANGE);}/***请求queue和exchange绑定*/@BeanBindingmsgBinding(){returnBindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);}/***返回队列和交换绑定*/@BeanBindingreplyBinding(){returnBindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);}}最后我们来看一下消息的消费:@ComponentpublicclassRpcServerController{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(RpcServerController.class);@AutowiredprivateRabbitTemplaterabbitTemplate;@RabbitListener(queues=RabbitConfig.RPC_QUEUE1)publicvoidprocess(Messagemsg){logger.info("serverreceive:{}",msg.toString());Messageresponse=MessageBuilder.withBody(("i'mreceive:"+newString(msg.getBody())).getBytes()).build();CorrelationDatacorrelationData=newCorrelationData(msg.getMessageProperties().getCorrelationId());rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE,RabbitConfig.RPC_QUEUE2,response,correlationData);}}这里的逻辑比较简单:服务端先收到消息,打印出来。服务器提取原始消息中的correlation_id。服务端调用sendAndReceive方法将消息发送到参数为correlation_id的RPC_QUEUE2队列。服务器发送消息后,客户端将收到服务器返回的结果。好的,你完成了。2.3测试接下来我们进行一个简单的测试。首先启动RabbitMQ。接下来分别启动producer和consumer,然后在postman中调用producer的接口进行测试,如下:可以看到已经收到了服务端的返回信息。我们看一下生产者的运行日志:可以看到消息发出后,也收到了消费者返回的消息。可以看到,消费者也收到了客户端的消息。3.总结好了,一个小案例,让小伙伴们体验一个RabbitMQ实现RPC调用。