当前位置: 首页 > 科技观察

Spring如何优雅的发送和消费消息

时间:2023-03-11 22:41:11 科技观察

前言20世纪90年代后期,随着JavaEE(企业版)的出现,尤其是使用EnterpriseJavaBeans需要复杂的描述符配置和死板复杂的代码实现增加了学习曲线和开发商的开发成本。因此,基于简单XML配置和PlainOldJavaObjects(普通旧Java对象)的Spring技术应运而生。DependencyInjection、InversionofControl(控制反转)和Aspect-OrientedProgramming(AOP)技术更敏捷地解决了传统Java企业和版本的不足。随着Spring的不断演进,基于注解(Annotation)的配置逐渐取代了XML文件配置。2014年4月1日,SpringBoot1.0.0正式发布,本着“约定优于配置”的原则,通过简单的结合各种starter(例如spring-boot-web-starter),应用程序可以直接在命令行上运行,而无需部署到单独的容器中。这种简单、直接、快速的构建和开发应用程序的过程可以使用约定的配置并简化部署,受到越来越多开发者的欢迎。ApacheRocketMQ是业界知名的分布式消息和流处理中间件。简单理解,它由Broker服务器和客户端两部分组成:其中一个客户端是消息发布客户端(Producer),它负责向Broker服务器发送消息。信息;另一个是消息的消费者客户端(Consumer)。多个消费者可以组成一个消费者组来订阅和拉取存储在消费者Broker服务器上的消息。为了利用SpringBoot的快速发展,让用户更灵活的使用RocketMQ消息客户端,ApacheRocketMQ社区推出了spring-boot-starter实现。随着RocketMQ4.3.0中分布式事务消息功能的上线,相关的spring-boot代码也在近期进行了升级,支持分布式事务的审核以及通过注解方式发送事务消息。本文将简要介绍目前的设计实现。读者可以通过本文了解将RocketMQClient集成到spring-boot-starter框架中的开发细节,然后通过一个简单的例子一步步讲解如何使用这个spring-boot-starter。用于配置、发送和使用RocketMQ消息的引导启动工具包。Spring中的消息框架顺便说一下Spring中消息的两个主要框架,分别是SpringMessaging和SpringCloudStream。它们都可以与SpringBoot集成,并提供一些参考实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效的简化开发者使用消息中间件的复杂度,让系统开发者更专注于核心业务逻辑的处理。2.1SpringMessagingSpringMessaging是SpringFramework4中新增的模块,是对Spring与消息系统集成的可扩展支持。它实现了从简单使用基于JmsTemplate的JMS接口到异步接收消息的一整套基础设施。SpringAMQP提供了协议所需的一组类似的功能。与SpringBoot集成后,具有自动配置能力,可以在测试和运行时与相应的消息系统集成。SpringMessaging纯粹是为了客户端,提供了一套抽象的API或约定的标准来指定消息发送者和消息接收者的模式。不同的消息中间件提供商可以在这种模式下提供自己的消息中间件。Spring实现:消息发送端需要实现一个XXXTemplate形式的JavaBean,结合SpringBoot的自动配置选项,提供多种不同的消息发送方式;在消息消费者端是一个XXXMessageListener接口(实现上通常使用一个Annotations声明一个消息驱动的POJO),提供一个回调方法来监听和消费消息,这个接口也可以使用SpringBoot的自动化选项和一些自定义属性。如果你有兴趣深入了解SpringMessaging以及不同消息产品的使用,推荐阅读这篇文档。RocketMQ的spring-boot-starter参考了现有的SpringMessaging实现,遵循了相关的设计模式,结合RocketMQ自身的功能特点,提供了相应的API(如顺序的、异步的、事务性的半消息等)。2.2SpringCloudStreamSpringCloudStream结合了SpringIntegration的注解和功能。其应用模型如下:图片引用自springcloudstreamSpringCloudStream框架提供了一个独立的应用核心,它通过输入(@Input)和输出(@Output)通道与外界通信,消息源(Source)通过输入通道发送消息,消费目标(Sink)通过监听输出通道获取消费的消息。这些通道通过专用的Binder连接到外部代理。开发者的代码只需要针对应用内核提供的固定接口和注解方法进行编程,不需要关心运行时具体绑定的Binder消息中间件。在运行时,SpringCloudStream可以自动检测并使用类路径下找到的Binder。这样开发者就可以很方便的在同一段代码中使用不同类型的中间件:只需要在构建时包含不同的Binder即可。在更复杂的使用场景下,你还可以在应用中封装多个Binder,让它自己选择Binder,甚至在运行时为不同的渠道使用不同的Binder。Binder抽象使SpringCloudStream应用程序可以灵活地连接到中间件。另外,SpringCloudStream利用了SpringBoot灵活的配置配置能力。这样的配置可以通过外部配置的属性和SpringBoo支持的任何形式提供(包括应用程序启动参数、环境变量和application.yml或application.properties文件),部署者可以动态选择通道连接目的地(例如,Kafka的主题或RabbitMQ的交换)在运行时。BinderSPI允许消息中间件产品使用可扩展的API编写相应的Binder并将其集成到SpringCloudSteam环境中。目前RocketMQ没有提供相关的Binder。我们计划在下一步中改进此功能。也希望社区有这方面经验的同学积极尝试,贡献PR或建议。spring-boot-starter的实现我们一开始就已经知道,springbootstarter构建的starter对于用户来说是非常方便的。用户只需要在pom.xml中引入starter的依赖定义,相应的编译、运行、部署等功能全部自动导入。因此,常用的开源组件都会为Spring用户提供一个spring-boot-starter包供开发者使用,方便开发者集成使用。下面详细介绍一下RocketMQ(客户端)的starter实现过程。3.1.spring-boot-starter的实现步骤一个spring-boot-starter的实现需要包括以下几个部分:1.在pom.xml定义org.apache.rocketmq中定义要生成的starter组件信息spring-boot-starter-rocketmq1.0.0-SNAPSHOT定义了依赖包,分为两部分:A,Spring自带的依赖包;B、RocketMQ的依赖包org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-testtestorg.apache.rocketmqrocketmq客户端${rocketmq-version}org.springframework.bootspring-boot-starter-parent${spring.boot.version}pomimport2.配置文件类定义应用属性配置文件类RocketMQProperties,这个Bean定义了一组默认的属性值使用finalstarter时,用户可以根据该类定义的属性修改其值。当然不是直接修改这个类的配置,而是修改spring-boot应用中对应的配置文件:src/main/resources/application.properties。3.定义自动加载类在src/resources/META-INF/spring.factories文件中定义自动加载类,其目的是让springboot自动初始化相关的Bean和Component,使用文中指定的自动配置类或者Service,其内容如下:org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration在RocketMQAutoConfiguration类的具体实现中,定义了对用户开放的Bean对象直接使用。其中:RocketMQProperties是加载应用属性配置文件的处理类;RocketMQTemplate是发送方用户发送消息的发送模板类;ListenerContainerConfiguration容器bean负责发现并注册消费端消费实现接口类。该类要求:由@RocketMQMessageListener注解;实现RocketMQListener泛化接口。4、最后在发送者(producer)和消费者(consumer)客户端分别打包具体的RocketMQ相关包,当前实现版本提供了对SpringMessaging接口的兼容方法。3.2.消息发送端的实现1.普通发送端发送端的代码封装在RocketMQTemplatePOJO中。下图是发送端相关代码的调用关系图:为了兼容SpringMessaging的发送模板,在RocketMQTemplate中集成了AbstractMessageSendingTemplate抽象类来支持相关的消息转换和发送方法,这些方法将最终被委托给doSend()方法;doSend()和RocoketMQ特有的一些异步、单向、顺序等方法直接添加到RoketMQTempalte中,这些方法直接委托给RocketMQ的ProducerAPI调用Sendmessages。2.交易消息发送端的交易消息处理在消息发送端进行了部分扩展。参考下图调用关系类图:RocketMQTemplate增加了发送事务消息的方法sendMessageInTransaction(),最终这个方法会Proxy到RocketMQ的TransactionProducer进行调用,其关联的TransactionListener实现类会注册到这个上面Producer,以便消息发送后可以调用TransactionListener中的方法实现。3.3.消息消费者实现消费者端Spring-Boot应用启动后,会扫描所有包含@RocketMQMessageListener注解的类(这些类需要集成RocketMQListener接口并实现onMessage()方法),这个Listener会被放置在一个-to-one在DefaultRocketMQListenerContainer容器对象中,容器对象会根据消费模式(并发或顺序)将RocketMQListener封装到RocketMQ内部具体的并发或顺序接口实现中。在容器中创建一个RocketMQConsumer对象,启动并监听自定义的Topic消息,如果有消费者消息则回调Listener的onMessage()方法。以上章节介绍了RocketMQ在spring-boot-starter模式下的实现。下面以一个最简单的消息发送和消费的例子来介绍如何制作这个rocketmq-spring-boot-starter。4.1RocketMQ服务器准备工作启动NameServer和Broker验证RocketMQSpring-Boot客户端,首先要确保RocketMQ服务已正确下载和启动。可以参考RocketMQ主站快速入门进行操作。确保NameServer和Broker已正确启动。2.在实例中创建需要的Topics,在执行启动命令的目录下执行如下命令行操作bashbin/mqadminupdateTopic-cDefaultCluster-tstring-topic对于没有提交的Maven中心库,用户需要下载使用前git源码,然后执行mvncleaninstall安装到本地仓库。gitclonehttps://github.com/apache/rocketmq-externals.gitcdrocketmq-spring-boot-startermvncleaninstall4.3。编写客户端代码如果用户使用,需要在消息发布消费客户端的maven配置文件pom.xml中添加如下依赖:1.0.0-SNAPSHOTorg.apache.rocketmqspring-boot-starter-rocketmq${spring-boot-starter-rocketmq-version}属性spring-boot-starter-rocketmq-version的值为:1.0.0-SNAPSHOT,与本地仓库安装的版本一致在上一步中。发送方代码application.properties的配置文件#定义name-server地址spring.rocketmq.name-server=localhost:9876#定义publisher组名spring.rocketmq.producer.group=my-group1#定义发送什么topicspring.rocketmq.topic=string-topicsender'sJavacodeimportorg.apache.rocketmq.spring.starter.core.RocketMQTemplate;...@SpringBootApplicationpublicclassProducerApplicationimplementsCommandLineRunner{//声明并引用RocketMQTemplate@ResourceprivateRocketMQTemplaterocketMQTemplate.peries定义;/pro/use主题property@Value("${spring.rocketmq.springTopic}")privateStringspringTopic;publicstaticvoidmain(String[]args){SpringApplication.run(ProducerApplication.class,args);}publicvoidrun(String...args)throwsException{//向指定主题同步发送字符串消息SendResultsendResult=rocketMQTemplate.syncSend(springTopic,"Hello,World!");//打印发送结果信息System.out.printf("string-topicsyncSend1sendResult=%s%n",sendResult);}}2.消息消费者代码消费者配置文件application.properties#definename-server地址spring.rocketmq.name-server=localhost:9876#定义发布者组名spring.rocketmq.consumer.group=my-customer-group1#定义要发送的主题spring.rocketmq.topic=string-topic消费者端的Java代码@SpringBootApplicationpublicclassConsumerApplication{publicstaticvoidmain(String[]args){SpringApplication.run(ConsumerApplication.class,args);}}//声明消费消息的类,并在注解中指定,相关消费信息@Service@RocketMQMessageListener(topic="${spring.rocketmq.topic}",consumerGroup="${spring.rocketmq.consumer.group}")classStringConsumerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmessage){System.out.printf("--------StringConsumerreceived:%s%f",message);}}这里只是简单介绍一下使用spring-boot编写最基本的消息收发代码。如需了解更多调用方式,如:异步发送、对象消息体、指定tag标签、指定交易消息等,请参考github文档和详细代码,后续会陆续介绍这些高级功能。