我在写SpringBoot基础教程的时候写了一篇文章《Spring Boot中使用RabbitMQ》。在本文中,我们可以通过简单的配置和注解向RabbitMQ生产和消费消息。其实我们使用RabbitMQ的starter是通过SpringCloudStream对RabbitMQ的支持实现的。让我们通过这篇文章来了解一下SpringCloudStream。SpringCloudStream是一个为微服务应用构建消息驱动能力的框架。它可以基于SpringBoot来创建独立的、生产就绪的Spring应用程序。它使用SpringIntegration连接消息代理中间件,实现消息事件驱动的微服务应用。SpringCloudStream为部分供应商的消息中间件产品提供了个性化的自动配置实现,引入了发布订阅、消费者组和消息分区三个核心概念。简单的说,SpringCloudStream本质上是集成了SpringBoot和SpringIntegration,实现了一个轻量级的消息驱动的微服务框架。通过使用SpringCloudStream,开发者可以有效简化消息中间件的使用复杂度,让系统开发者更专注于核心业务逻辑的处理。由于SpringCloudStream基于SpringBoot,继承了SpringBoot的优点,实现了自动配置的功能,帮助我们快速上手。不过到目前为止,SpringCloudStream只支持以下两个著名的消息中间件配置的自动化:RabbitMQKafkaQuickStart下面我们通过搭建一个简单的例子来初步了解一下SpringCloudStream。此示例的主要目标是构建一个基于SpringBoot的微服务应用程序,该应用程序将使用消息中间件RabbitMQ接收消息并将它们打印到日志中。因此,在执行以下步骤之前,请先确认本地已经安装了RabbitMQ。具体安装步骤可以参考这篇文章。搭建一个SpringCloudStream消费者创建一个基础的SpringBoot工程,命名为:stream-hello编辑pom.xml中的依赖,引入SpringCloudStream对RabbitMQ的支持,如下:org.springframework.bootspring-boot-starter-parent1.5.9.RELEASE<依赖项>org.springframework.bootspring-boot-starter-testtestorg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.cloudspring-cloud-dependenciesDalston.SR4pomimport创建一个消费者SinkReceiver,用于接收来自RabbitMQ的消息,具体如下:@EnableBinding(Sink.class)publicclassSinkReceiver{privatestaticLoggerlogger=LoggerFactory.getLogger(SinkReceiver.class);@StreamListener(Sink.INPUT)publicvoidreceive(Objectpayload){logger.info("Received:"+payload);}}创建应用程序主类和其他SpringBoot一样,这里没有什么特别之处,如下所示:@SpringBootApplicationpublicclassSinkApplication{publicstaticvoidmain(String[]args){SpringApplication.run(SinkApplication.class,args);}}这里,我们的快速入门示例编码任务已经完成。让我们分别启动RabbitMQ和SpringBoot应用,然后做下面的实验看看它们是如何工作的。手动测试验证下面看一下SpringBoot应用的启动日志。...INFO16272---[main]o.s.c.s.b.r.RabbitMessageChannelBinder:declaringqueueforinbound:input.anonymous.Y8VsFILmSC27eS5StsXp6A,boundto:inputINFO16272---[main]o.s.a.r.c.CachingConnectionFactory:Creatednewconnection:SimpleConnection@3c78q2.am0.551[dleguest@3c78eguest0.am551]:5672/]INFO16272---[main]o.s.integration.channel.DirectChannel:Channel'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge'has1subscriber(s).INFO16272---[main]o.s.i.a.i.AmqpInboundChannelAdapter:startedinbound.inputSt2YmSC7Lp.anonymous....从上面的日志内容,我们可以得到如下信息:一个指向127.0.0.1:5672的RabbitMQ连接是使用guest用户创建的,我们也可以在RabbitMQ控制台中找到。声明一个名为input.anonymous.Y8VsFILmSC27eS5StsXp6A的队列,并通过RabbitMessageChannelBinder将自己绑定为其消费者。我们也可以在RabbitMQ控制台中找到这些信息。接下来我们可以在RabbitMQ控制台进入input.anonymous.Y8VsFILmSC27eS5StsXp6A队列的管理页面,通过PublishMessage函数向队列发送消息。此时我们可以在当前启动的SpringBoot应用的控制台中看到如下内容:INFO16272---[C27eS5StsXp6A-1]com.didispace.HelloApplication:Received:[B@7cba610e我们可以在应用控制台中找到输出的内容由SinkReceiver中的receive方法定义,输出的具体内容是从消息队列中获取的对象。由于我们这里没有序列化消息,所以输出的只是对象的引用。在后面的章节中,我们将详细介绍收到消息后的处理。在成功完成了上面的快速入门示例之后,我们来简单说明一下上面的步骤是如何将我们的SpringBoot应用连接到RabbitMQ来消费消息来实现消息驱动的业务逻辑的。首先,我们为SpringBoot应用做的是引入spring-cloud-starter-stream-rabbit依赖,它是对SpringCloudStream对RabbitMQ的支持的封装,其中包含了RabbitMQ的自动配置等内容。从它下面定义的依赖我们也可以知道它相当于spring-cloud-stream-binder-rabbit依赖。org.springframework.cloudspring-cloud-stream-binder-rabbit接下来我们再来看看这里SinkReceiver中定义了SpringCloudStream的几个核心注解:@EnableBinding,用于指定一个或多个定义@Input或@Output注解的接口,实现消息通道(Channel)绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了Sink接口,这是SpringCloudStream中默认定义的输入消息通道绑定。其源码如下:publicinterfaceSink{StringINPUT="input";@Input(Sink.INPUT)SubscribableChannelinput();}通过@Input注解绑定了一个名为input的通道。除了Sink,SpringCloudStream还实现了默认绑定输出通道的Source接口,以及结合Sink和Source的Processor接口。在实际使用中,我们还可以通过@Input和@Output注解来定义绑定消息。频道的界面。当我们需要为@EnableBinding指定多个接口来绑定消息通道时,我们可以这样定义:@EnableBinding(value={Sink.class,Source.class})。@StreamListener:该注解主要定义在方法上,作用是将修改后的方法注册为消息中间件上数据流的事件监听器。注解中的属性值对应要监控的消息通道名称。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为输入消息通道的监听器,这样当我们在RabbitMQ控制页面发布消息时,receive方法就会响应响应动作。编写消息消费单元测试用例以上我们完成了通过RabbitMQ控制台发送消息,验证消息消费程序的功能。这种方法虽然比较low,但是通过上面的步骤,相信大家对RabbitMQ和SpringCloudStreamConsumption的消息比较熟悉,已经有了一些基本的了解。让我们通过编写生成消息的单元测试用例来完善我们的介绍性内容。在上面创建的项目中创建单元测试类:@RunWith(SpringRunner.class)@EnableBinding(value={SinkApplicationTests.SinkSender.class})publicclassSinkApplicationTests{@AutowiredprivateSinkSendersinkSender;@TestpublicvoidsinkSenderTester(){sinkSender.output().send(MessageBuilder.withPayload("produceamessage:http://blog.didispace.com").build());}publicinterfaceSinkSender{StringOUTPUT="input";@Output(SinkSender.OUTPUT)MessageChanneloutput();}}应用了上面的之后消息消费者程序,运行这里定义的单元测试程序,我们可以立即在消息消费者的控制台收到如下内容:INFO50947---[L2W-c2AcChb2Q-1]com.didispace.stream.SinkReceiver:Received:produceamessage:http://blog.didispace.com在上面的单元测试中,我们通过@Output(SinkSender.OUTPUT)定义了一个输出,输出通道的名称为input,与前面Sink中定义的消费相同通道具有相同的名称,因此这里的单元测试和前面的消费者程序组成了一对生产者和消费者。至此,本文的内容就结束了。如果你能独立完成上面的例子,那么SpringCloudStream的基本使用也算是入门了。然而,SpringCloudStream的用途远不止于此。在最近的博文中,我会继续更新这部分内容,帮助大家了解和使用SpringCloudStream构建消息驱动的微服务!【本文为专栏作家“翟永超”原创稿件,转载请联系作者获得授权】点此查看该作者更多好文