当前位置: 首页 > 科技观察

SpringCloudStream使用延迟消息实现定时任务(RabbitMQ)

时间:2023-03-12 02:22:45 科技观察

应用场景当我们使用一些开源的调度系统(如:elastic-job等)时,任务的执行时间通常是有规律的,这可能是每半小时或每天凌晨1点执行一次。但是实际业务中还有一种定时任务,可能需要一些触发条件才能开始定时,比如:写博文时,设置2小时后发送。对于这些开始时间不确定的定时任务,我们也可以通过SpringCloudStream来很好的处理。为了触发开始时间不确定的定时任务,我们将引入延迟消息的使用。RabbitMQ提供了延迟消息的插件,因此本文将专门介绍如何使用SpringCloudStream和RabbitMQ轻松处理上述问题。尝试手动安装插件。RabbitMQ延迟消息的插件介绍可以查看官网:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/安装方法很简单,只需要在这个页面找到rabbitmq_delayed_message_exchange插件:http://www.rabbitmq.com/community-plugins.html,根据你使用的RabbitMQ版本选择对应的插件版本下载即可。注意:只有RabbitMQ3.6.x及以上版本支持下载后解压后以.ez结尾的插件包,复制到RabbitMQ安装目录下的plugins文件夹中。然后通过命令行启用插件:rabbitmq-pluginsenablerabbitmq_delayed_message_exchange插件通过上述命令启用后可以直接使用,无需重启。另外,如果你没有启用这个插件,你可能会遇到这样的错误:ERROR156---[127.0.0.1:5672]o.s.a.r.c.CachingConnectionFactory:Channelshutdown:connectionerror;protocolmethod:#method(reply-code=503,reply-text=COMMAND_INVALID-unknownexchangetype'x-delayed-message',class-id=40,method-id=10)applicationcoding下面写一个简单的例子来理解这个属性的用法:@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublicclassTestApplication{publicstaticvoidmain(String[]args){SpringApplication.run(TestApplication.class,args);}@Slf4j@RestControllerstaticclassTestController{@AutowiredprivateTestTopictestTopic;/***消息生产接口**@parammessage*@return*/@GetMapping("/sendMessage")publicStringmessageWithMQ(@RequestParamStringmessage){log.info("Send:"+message);testTopic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay",5000).build());return"ok";}}/***消息消费逻辑*/@Slf4j@ComponentstaticclassTestListener{@StreamListener(TestTopic.INPUT)publicvoidreceive(Stringpayload){log.info("Received:"+payload);}}interfaceTestTopic{StringOUTPUT="example-topic-output";StringINPUT="example-topic-input";@Output(OUTPUT)MessageChanneloutput();@Input(INPUT)SubscribableChannelinput();}}内容很简单,既包括消息生产,也包括消息消费在/sendMessage接口的定义中,发送消息,消息头信息包含x-delay字段,用于指定消息延迟时间,以毫秒为单位。所以上面代码发送的消息会在5秒后被消费。在消息监听类TestListener中,为TestTopic.INPUT通道定义了@StreamListener,这里会针对延迟的消息实现具体的逻辑。由于消息的消费是延迟的,因此变相实现了从消息发送那一刻开始的定时任务。在启动应用程序之前,需要进行一些必要的配置。消息生产者和消费者描述如下:消息生产者spring.cloud.stream.bindings.example-topic-output.destination=delay-topicsspring.cloud。stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true这里注意一个新的参数spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用来开启延迟消息的功能,这样在创建交易所的时候,就会设置为具有延迟特性的交易所,也就是使用我们上面安装的延迟消息插件的功能。消息消费者spring.cloud.stream.bindings.example-topic-input.destination=delay-topicsspring.cloud.stream.bindings.example-topic-input.group=testspring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true在consumer端也是一样的,需要设置spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true。如果不设置该参数,会出现类似如下错误:ERROR9340---[127.0.0.1:5672]o.s.a.r.c.CachingConnectionFactory:Channelshutdown:channelerror;protocolmethod:#method(reply-code=406,reply-text=PRECONDITION_FAILED-inequivalentarg'type'forexchange'delay-topic'invhost'/':received'topic'butcurrentis''x-delayed-message'',class-id=40,method-id=10)完成了以上配置完成后,就可以启动应用,尝试访问localhost:8080/sendMessage?message=hello接口向MQ发送消息。此时可以看到类似如下的日志:2019-01-0223:28:45.318INFO96164---[ctor-http-nio-3]c.d.s.TestApplication$TestController:Send:hello2019-01-0223:28:45.328INFO96164---[ctor-http-nio-3]o.s.a.r.c.CachingConnectionFactory:Attemptingtoconnectto:[localhost:5672]2019-01-0223:28:45.333INFO96164---[ctor-http-nio-3]o.s.a.r.c.CachingConnectionFactory:创建了新连接:rabbitConnectionFactory。publisher#5c5f9a03:0/SimpleConnection@3278a728[delegate=amqp://guest@127.0.0.1:5672/,localPort=53536]2019-01-0223:28:50.349INFO96164---[ay-topic.test-1]c.d.stream.TestApplication$TestListener:Received:hello从日志中我们可以看到Send:hello和Received:hello的两次输出之间有5秒的间隔,符合上面的编码。深度思考已经在代码层面完成了定时任务,那么我们如何查看延迟消息的数量等信息呢?此时,我们可以打开RabbitMQweb控制台,首先进入Exchanges页面,查看这个特殊的exchange,如下:可以看到,这个exchange的Type是x-delayed-message。点击交易所名称进入详细页面,可以看到更具体的信息:代码示例读者可以查看下面仓库中的stream-delayed-message项目:GithubGitee