当前位置: 首页 > 科技观察

SpringCloudStream使用详解和部分关键源码分析

时间:2023-03-14 00:42:10 科技观察

环境:Springboot2.3.12.RELEASE+SpringCloudHoxton.SR12+RabbitMQ3.8.12介绍SpringCloudStream是一个用于构建与MQDrive微服务连接的高度可扩展事件的框架。其目的是简化SpringCloud应用程序中消息传递的开发。屏蔽了各种MQ之间的差异,更换MQ时无需修改代码。SpringCloudStream支持多种binder实现,如下:RabbitMQ。阿帕奇卡夫卡。卡夫卡流。亚马逊运动。GooglePubSub(合作伙伴维护)。SolacePubSub+(合作伙伴维护)。Azure事件中心(合作伙伴维护)。AWSSQS(合作伙伴维护)。AWSSNS(合作伙伴维护)。ApacheRocketMQ(合作伙伴维护)。详细查看官方文档,每个MQ对应都有一个Github地址。SpringCloudStream的核心构建块是:DestinationBinders:负责与MQ集成的组件。DestinationBindings:MQ中间件和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。消息:生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。Stream内核组关联图快速入门依赖:Hoxton.SR12org.springframework.bootspring-boot-starter-amqporg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.cloudspring-cloud-dependencies<版本>${spring-cloud.version}pomimport应用配置:spring:rabbitmq:host:localhostvirtual-host:busport:5672username:xxxpassword:xxx---spring:cloud:stream:bindings:#自定义输入输出myInput:#指定输入通道对应的主题名称destination:demomyOutput:destination:演示创建接口绑定到消息通道:publicinterfaceStreamBinding{StringINPUT="myInput";字符串输出="我的输出";@Input(StreamBinding.INPUT)SubscribableChannel输入();@Output(StreamBinding.OUTPUT)MessageChanneloutput();}通过@Input和@Output注解定义输入通道和输出通道名称。这里的名称与上述配置文件中的名称相对应。在定义输出通道时,需要返回MessageChannel接口对象,它定义了向消息通道发送消息的方法。方法;定义输入通道时,需要返回SubscribableChannel接口对象,该接口对象集成自MessageChannel接口,定义了维护消息通道订阅者的方法。这里的Input和Output方法容器会分别创建一个Bean对象。创建消费者:@Component@EnableBinding(value={StreamBinding.class})publicclassStreamReceiver{privateLoggerlogger=LoggerFactory.getLogger(StreamReceiver.class);@StreamListener(StreamBinding.INPUT)publicvoidreceive(Stringmessage){logger.info("Receivedmessage:{}",message);}}@EnableBinding注解用于指定一个或多个定义了@Input或@Output注解的接口,从而实现消息通道(Channel)的绑定。上面我们通过@EnableBinding(value={StreamClient.class})绑定了StreamClient接口,这是我们自己实现的输入输出消息通道绑定的定义。@StreamListener主要定义在方法上,作用是将修改后的方法注册为消息中间件上数据流的事件监听器。注解中的属性值对应要监控的消息通道名称。上面我们将receive方法注册为myInput消息通道的监听处理器。当我们向这个消息通道发送信息时,receiver方法就会被执行。消息发送接口:@ResourceprivateStreamBindingstreamBinding;@GetMapping("/send")publicvoidsend(){streamBinding.output().send(MessageBuilder.withPayload("FirstMessage...").build());}启动服务:查看RabbitMQ是否已经自动为我们创建了一个队列。队列的名称以我们在配置文件中配置的配置开头,后面随机生成。该队列会自动删除AD,服务关闭后队列会自动删除;excl:独占,如果队列存在,则不会再次创建。修改端口后,启动另一个服务:创建2个队列,使用其中一个发送消息:两个服务都接收消息。消费者组上启动的两个服务都能收到消息。在集群环境下,这肯定会出问题。如果是业务,就会有重复的数据。这时候我们可以通过设置分组来解决这个问题。修改配置:spring:cloud:stream:bindings:myInput:#指定输入通道对应的topic名称destination:demo#指定一个group;指定组后,无论启动多少个实例,所有实例都会监听这个队列#More每个实例都会轮询接收消息group:g_testmyOutput:destination:demo再次启动服务后,两个服务会轮询接收消息。启动服务后,两个服务同时监听同一个队列。队列不是随机生成的,队列是持久化的,服务断开后队列不会自动删除。消息分区可以通过消费者组的设置,保证同一条消息只能被一个消费者接收和处理。但是对于特殊的业务情况,除了保证单实例消费外,还希望那些具有相同特征的消息能够被同一个消费者共享。一个实例消费,这个可以使用SpringCloudStream提供的消息分区功能。修改设置。spring:cloud:stream:bindings:myInput:#指定输入通道对应的主题名称destination:demo#指定一个组;指定组后,无论启动多少个实例,所有实例都会监听这个队列#多个实例会轮询接收到的消息:#这里的配置也可以是SpEL表达式,例如:headers['partition']通过消息头获取属性#这里会通过表达式和消息对象计算出一个Key,然后得到key的hashCode得到#得到hashCode后,会和partitionCount进行模运算得到具体的分区partitionKeyExpression:'1'#我这里给的值是对应的instanceIndex值,谁要接收可以设置配置值partitionCount:2#实例总数instanceCount:2#该参数设置当前实例的索引号,从0开始instanceIndex:0计算partition源码:最终得到分区信息后,会在消息头中放置一个以scst_partition为key,partition为value的header。启动多个实例后,测试发现所有消息均由同一个实例接收。交换机绑定每个服务,使用不同的RoutingKey,这样在发送消息时,可以根据计算处理的分区,定向发送消息。通过源码查看:这里是我们配置开关的一个demo。下一步是获取路由密钥。这里会从消息头中获取key=scst_partition的头信息。这样就确定了使用RabbitMQ为中间件发送消息所需的开关和路由键。