当前位置: 首页 > 后端技术 > Java

rocketONS-starter:阿里云ONS消息服务轻量级SpringBootStarter

时间:2023-04-01 21:00:18 Java

rocketONS-starter简介rocketONS-starter是一个基于阿里云ONS消息服务的轻量级SpringBootStarter。它为您提供了一个简单高效的封装,使您能够快速创建RocketMQ生产者和消费者,实现应用程序之间的消息传递。适用于高并发、高吞吐、低延迟的分布式消息场景。软件架构rocketONS-starter基于Spring的ApplicationContext容器管理,自动扫描消费者监听器,注册并启动消费者接收和处理阿里云ONS服务分发的消息。根据配置文件动态创建消费者和生产者,自定义消费者的启动和停止开关,自动序列化和解析消息实体。同时还支持消息过滤、顺序消息、延迟消息等高级功能,满足不同场景的需求。主要特点简单易用:提供简洁的配置和API,快速上手,轻松实现消息的生产和消费。高性能:基于阿里云ONS消息服务,享受高并发、高吞吐、低延迟的分布式消息服务。弹性扩展:根据业务需要自由添加消费者和生产者,实现弹性扩展。高级功能支持:支持消息过滤、顺序消息、延迟消息等高级功能,满足不同场景的需求。安装在你的项目中添加如下依赖:io.gitee.zhucan123rocket-ons-spring-boot-starter1.0.8使用说明1.在项目rocket中添加配置:地址:http://xxxxsecretKey:xxxxaccessKey:xxxxtopic:xxxxgroupSuffix:GID_enable:truedelay:1000参数名类型是否为必填项默认值说明accessKeyString是-身份认证的AccessKeyId,在阿里云账号管理控制台创建。secretKeyString是-用于身份认证的AccessKeySecret,在阿里云账号管理控制台创建。addressString是-设置TCP协议访问点。groupSuffixString否GID_控制台创建的GroupID的前缀,通常以“GID_”开头。topicString是-当生产者没有指定主题时使用的默认绑定主题。delayInteger否1000消息发送延迟,以毫秒为单位。enableBooleannotrue是否开启启动器。2.在主程序中启用rocketONS-starter@EnableRocketONSpublicclassApp{publicstaticvoidmain(String[]args){SpringApplication.run(App.class,args);}}3.示例代码:useconsumer@ConsumerListener(tags="msg_tag",consumers=2)@OnsConfiguration(topic="topic-example",group="GID_${example.group}")publicclassExampleConsumerListenerimplementsRocketListener{@OverridepublicActionconsume(Messagemessage,MessageDatamessageBody,ConsumeContextconsumeContext){//处理业务逻辑returnAction.CommitMessage;}}@OnsConfiguration:注册为Spring容器,设置消费者绑定的topic和group。可以设置固定值,也可以使用${propertiesKey}读取配置文件中的配置。@ConsumerListener:标识这是一个ONS消息消费者监听器。可以设置标签过滤消息,设置消费者指定消费者线程数。4.示例代码:useproducer@ServicepublicclassExampleProducerService{@AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidsendMessage(MessageDatamessageData){rocketMQTemplate.syncSend("topic-example:msg_tag",messageData);同步发送消息。方法参数中的字符串格式为topic:tag,表示发送到指定主题并设置消息标签。高级特性1.顺序消息使用RocketMQTemplate的syncSendOrderly方法发送顺序消息,保证消费者按照消息发送的顺序处理消息。rocketMQTemplate.syncSendOrderly("topic-example:msg_tag",messageData,orderId);2.发送延迟消息时,通过设置messageDelayLevel参数指定延迟级别。rocketMQTemplate.syncSend("topic-example:msg_tag",messageData,messageDelayLevel);3.消息过滤使用@ConsumerListener注解的tags属性实现消息过滤。设置相应的标签值,消费者只会消费带有该标签的消息。@ConsumerListener(tags="msg_tag",consumers=2)publicclassExampleConsumerListenerimplementsRocketListener{...}请注意,在生产消息时需要设置相应的标签。rocketMQTemplate.syncSend("topic-example:msg_tag",messageData);4、异步发送消息除了同步发送消息,RocketMQTemplate还提供了异步发送消息的方法。使用asyncSend方法发送消息,并提供回调函数处理发送结果。rocketMQTemplate.asyncSend("topic-example:msg_tag",messageData,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){//处理发送成功的逻辑}@OverridepublicvoidonException(Throwablee){//处理发送失败逻辑}});5.广播模式广播模式允许你向所有消费者发送消息。要启用广播模式,您需要在消费者侦听器中将广播属性设置为true。@ConsumerListener(tags="msg_tag",consumers=2,broadcast=true)publicclassExampleConsumerListenerimplementsRocketListener{...}6.消息重试策略当消费者处理消息失败时,可以返回Action.ReconsumeLater触发消息重试。您可以在配置文件中设置重试策略。yamlCopycoderocket:retry:3delay:1000参数名称类型必填默认值说明retryInteger否消息重试3次。delayInteger否1000消息发送延迟,以毫秒为单位。7.自定义序列化和反序列化默认情况下,rocketONS-starter使用Java来序列化和反序列化消息。但是您可以通过实现MessageSerializer接口来自定义序列化和反序列化策略。publicclassCustomMessageSerializerimplementsMessageSerializer{@Overridepublicbyte[]serialize(Objectobj){//实现自定义序列化逻辑}@OverridepublicTdeserialize(byte[]data,Classclazz){//实现自定义反序列化逻辑}}然后,在配置文件中指定自定义序列化器。rocket:serializer:com.example.CustomMessageSerializer注意事项确保生产环境和消费环境使用相同的序列化和反序列化策略。使用Action.ReconsumeLater触发消息重试时,注意避免重试风暴,以免影响整体性能。8、延迟消息RocketMQ支持延迟消息发送,发送时可以指定延迟级别。要发送延迟消息,请使用sendDelay方法。intdelayLevel=3;//延迟级别,具体延迟时间需要参考RocketMQ的延迟级别配置rocketMQTemplate.sendDelay("topic-example:msg_tag",messageData,delayLevel);9、事务性消息RocketMQ支持发送事务性消息,可以将发送的消息与本地事务相关联。使用sendMessageInTransaction方法发送事务性消息。rocketMQTemplate.sendMessageInTransaction("topic-example:msg_tag",messageData,newLocalTransactionExecuter(){@OverridepublicLocalTransactionStateexecuteLocalTransactionBranch(Messagemsg,Objectarg){//执行本地事务//返回事务状态}},null);10.集成测试为了便于集成测试,您可以使用RocketMQTestListener注释启动测试消费者。测试消费者会将接收到的消息存储在内存中,以便在测试时进行验证。@RocketMQTestListener(topics="topic-example",tags="msg_tag")publicclassExampleConsumerListenerimplementsRocketListener{...}在测试用例中,可以使用RocketMQTestListener.getMessages()方法获取接收到的消息.11、性能调优为了提高系统性能,可以通过以下方式对RocketMQ进行调优:调整线程池大小。优化网络参数,例如连接超时。调整消费者拉取批量大小。优化消息堆积参数。具体参数配置请参考RocketMQ官方文档。12.如果您对社区和支持有任何疑问,请在码云上提交问题。欢迎参与项目开发,可以通过Fork仓库提交PullRequest。更多信息请关注作者码云主页。