当前位置: 首页 > 科技观察

RabbitMQ实现延迟队列的两种方式!

时间:2023-03-19 22:18:10 科技观察

各种定时任务。对于日志备份等常见的定时任务,我们可能会在每天凌晨3点进行备份。我们可以通过cron表达式轻松实现这个定时定时任务。还有一些特殊的定时任务,给大家看一下电影里的定时炸弹,3分钟后爆炸。这种定时任务不好用cron来描述,因为开始时间是不确定的,有时候我们在开发的时候会遇到类似的任务。需求,例如:在一个电商项目中,我们下单之后,一般需要在20分钟或者30分钟内付款,否则订单会进入异常处理逻辑并被取消,然后进入异常处理逻辑,它可以看作是一个延迟队列。我买了一个智能砂锅,可以用来煮粥。上班前把所有的材料都放进锅里,然后定好时间开始煮粥,这样下班后就能喝上香喷喷的粥了。那么这个煮粥器的一条指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了就执行。公司会议预约系统会在会议预约成功后,在会议开始前半小时通知所有已预约会议的用户。安全工单超过24小时未处理,自动拉企业微信群提醒相关责任人。用户下单外卖后,当距离超时还有10分钟时,会提醒外卖小哥超时即将结束。...我们在很多场景下都需要延迟队列。本文以RabbitMQ为例,跟大家聊一聊延迟队列的玩法。总体来说,在RabbitMQ上实现定时任务有两种方式:利用RabbitMQ自带的消息过期和私有消息队列机制来实现定时任务。使用RabbitMQ的rabbitmq_delayed_message_exchange插件实现定时任务比较简单。我们分别来看一下这两种用法。1.使用plugin1.1安装插件首先我们需要下载rabbitmq_delayed_message_exchange插件,这是github上的一个开源项目,我们可以直接下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases选择适合我自己的版本,我这里选择最新的3.9.0版本。下载完成后,在命令行执行如下命令将下载的文件复制到Docker容器中:dockercp./rabbitmq_delayed_message_exchange-3.9.0.ezsome-rabbit:/plugins这里第一个参数是宿主机上的文件地址machine,第二个参数是复制到容器中的位置。接下来执行以下命令进入RabbitMQ容器:dockerexec-itsome-rabbit/bin/bash进入容器后,执行以下命令启用插件:rabbitmq-pluginsenablerabbitmq_delayed_message_exchange启用成功后,还可以查看所有通过如下命令安装插件,查看是否有我们刚刚安装的插件,如下:rabbitmq-pluginslist命令完整的执行过程如下图所示:OK,配置完成之后,我们执行exit命令退出RabbitMQ容器。然后开始编码。1.2消息传递接下来,开始消息传递。首先我们创建一个SpringBoot项目,引入Web和RabbitMQ依赖,如下:项目创建成功后,在application.properties中配置RabbitMQ的基本信息,如下:spring.rabbitmq.host=localhostspring.rabbitmq.password=guestspring.rabbitmq。username=guestspring.rabbitmq.virtual-host=/接下来提供一个RabbitMQ的配置类:@ConfigurationpublicclassRabbitConfig{publicstaticfinalStringQUEUE_NAME="javaboy_delay_queue";publicstaticfinalStringEXCHANGE_NAME="javaboy_delay_exchange";publicstaticfinalqueEXCHANGE_TYayPE="x-returnnewQueue(QUEUE_NAME,true,false,false);}@BeanCustomExchangecustomExchange(){Mapargs=newHashMap<>();args.put("x-delayed-type","direct");returnnewCustomExchange(EXCHANGE_NAME,EXCHANGE_TYPE,true,false,args);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noargs();}}这里主要是switch的定义有区别,需要的朋友要注意。我们这里使用的开关是CustomExchange,它是Spring中提供的开关。创建CustomExchange时有五个参数,含义如下:交换机名称。开关型,这个地方是固定的。开关是否持久。如果没有绑定队列,是否删除交换。其他参数。最后一个args参数指定开关消息分发的类型。这种类型就是众所周知的direct、fanout、topic和header。使用哪种类型将决定交换机将来以何种方式分发消息。接下来我们创建一个消息消费者:@ComponentpublicclassMsgReceiver{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MsgReceiver.class);@RabbitListener(queues=RabbitConfig.QUEUE_NAME)publicvoidhandleMsg(Stringmsg){logger.info("handleMsg,{}",msg);}}打印消息的内容。接下来写一个发送消息的单元测试方法:@SpringBootTestclassMqDelayedMsgDemoApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads()throwsUnsupportedEncodingException{Messagemsg=MessageBuilder.withBody(("你好江南小雨"+newDate()).getBytes("UTF-8")).setHeader("x-delay",3000).build();rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,RabbitConfig.QUEUE_NAME,msg);}}在消息头设置消息的延迟时间。好了,接下来启动SpringBoot项目,然后运行单元测试方法发送消息。最终的控制台日志如下:从日志中可以看到实现了消息延迟。2.DLX实现延迟队列2.1延迟队列实现思路延迟队列实现的思路也很简单,就是我们在上一篇文章中提到的DLX(死信交换)+TTL(消息超时)。我们可以把死信队列看成是延迟队列。具体来说,如果一条消息需要延迟30分钟执行,我们将这条消息的有效期设置为30分钟,并为这条消息配置死信开关和死信routing_key,不为这条消息设置消费者这个消息队列,那么30分钟后,这个消息因为还没有被消费者消费,就进入了死信队列。这时,我们有一个消费者在死信队列中“停留”。消息一进入死信队列,立即被消费。这就是延迟队列的实现思路,是不是很简单?2.2案例接下来,宋大哥将通过一个简单的案例来演示延迟队列的具体实现。首先准备一个RabbitMQ启动。然后我们创建一个SpringBoot工程,引入RabbitMQ依赖:然后在application.properties中配置RabbitMQ的基本连接信息:spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672接下来我们来配置两个消息队列:一个普通队列,一个死信队列:@ConfigurationpublicclassQueueConfig{publicstaticfinalStringJAVABOY_QUEUE_NAME="javaboy_queue_name";publicstaticfinalStringJAVABOY_EXCHANGE_NAME="javaboy_exchange_name";publicstaticfinalStringJAVABOY_ROUTING_KEY="javaboy_routing_key";publicstaticfinalStringDLX_QUEUE_NAME="dlx_queue_name";publicstaticfinalStringDLX_EXCHANGE_NAME="dlx_exchange_name";publicstaticfinalStringDLX_ROUTING_KEY="dlx_routing_key";/***死信队列*@return*/@BeanQueuedlxQueue(){returnnewQueue(DLX_QUEUE_NAME,true,false,false);}/***死信交换*@return*/@BeanDirectExchangelxExchange(){returnnewDirectExchange(DLX_EXCHANGE_NAME,true,false);}/***绑定死信队列和死信交换*@return*/@BeanBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/***普通消息队列*@return*/@BeanQueuejavaboyQueue(){Mapargs=newHashMap<>();//设置消息过期时间args.put("x-message-ttl",1000*10);//设置死信交换args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);//Setdeadletterrouting_keyargs.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);returnnewQueue(JAVABOY_QUEUE_NAME,true,false,false,args);}/***普通开关*@return*/@BeanDirectExchangejavaboyExchange(){returnnewDirectExchange(JAVABOY_EXCHANGE_NAME,true,false);}/***绑定普通队列和对应开关*@return*/@BeanBindingjavaboyBinding(){returnBindingBuilder.bind(javaboyQueue()).to(javaboyExchange())。with(JAVABOY_ROUTING_KEY);}}这个配置代码虽然有点长,但是原理就是简单的配置可以分为两组。第一组配置死信队列,第二组配置普通队列。每个组由消息队列、消息交换和绑定组成。配置消息队列时,为消息队列指定死信队列。不熟悉的可以看之前的文章,传送门:RabbitMQ中的消息会过期吗?。配置队列中的消息过期时间时,默认时间单位为毫秒。接下来,我们为死信队列配置一个消费者,如下所示:;}}收到消息后,打印出来。就是这样。启动项目。最后我们在单元测试中发送消息:@SpringBootTestclassDelayQueueApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads(){System.out.println(newDate());rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME,QueueConfig.JAVABOY_ROUTING_,javay!K"hbo)}}这个没什么好说的,就是一个普通的消息发送,10秒后,这个消息会在死信队列的消费者中打印出来3.总结好了,以上就是我们使用RabbitMQ的两个思路作为延迟队列~有兴趣的朋友可以试试~本文转载自微信公众号“江南的一场小雨”,可以通过以下二维码关注,转载本文请联系江南一点鱼公众号。