@[toc]定时任务有很多种,常见的定时任务比如日志备份,我们可能每天凌晨3点去备份,这种定时任务用cron表达式很容易实现,有还有一些特殊的定时任务。给大家看一下电影里的定时炸弹,3分钟就爆炸了。这种定时任务不好用cron来描述,因为开始时间是不确定的,有时候我们在开发的时候也会用到。我们会遇到类似的需求,比如:在电商项目中,我们下单的时候,一般需要在20分钟或者30分钟内付款,否则订单会进入异常处理逻辑并被取消,然后进入异常处理处理逻辑,可以看作是一个延迟队列。我买了一个智能砂锅,可以用来煮粥。上班前把所有的材料都放进锅里,然后定好时间开始煮粥,这样下班后就能喝上香喷喷的粥了。那么这个煮粥器的一条指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了就执行。公司会议预约系统会在会议预约成功后,在会议开始前半小时通知所有已预约会议的用户。安全工单超过24小时未处理,自动拉企业微信群提醒相关责任人。用户下单外卖后,当距离超时还有10分钟时,会提醒外卖小哥超时即将结束。...我们在很多场景下都需要延迟队列。本文以RabbitMQ为例,跟大家聊一聊延迟队列的玩法。总体来说,在RabbitMQ上实现定时任务有两种方式:利用RabbitMQ自带的消息过期和私有消息队列机制来实现定时任务。使用RabbitMQ的rabbitmq_delayed_message_exchange插件实现定时任务比较简单。我们分别来看一下这两种用法。1.使用plugin1.1安装插件首先我们需要下载rabbitmq_delayed_message_exchange插件,这是github上的一个开源项目,我们可以直接下载:https://github.com/rabbitmq/r...选择版本适合你,我这里选择最新的3.9.0版本。下载完成后,在命令行执行如下命令将下载的文件复制到Docker容器中:dockercp./rabbitmq_delayed_message_exchange-3.9.0.ezsome-rabbit:/plugins这里的第一个参数是文件地址在宿主机,第二个第二个参数是复制到容器的位置。接下来执行以下命令进入RabbitMQ容器:dockerexec-itsome-rabbit/bin/bash进入容器后,执行以下命令启用插件:rabbitmq-pluginsenablerabbitmq_delayed_message_exchange启用成功后,就可以了同样查看所有Installedplugins,看看有没有我们刚刚安装的插件,如下:rabbitmq-pluginslist命令完整的执行过程如下图所示:OK,配置完成之后,然后我们执行exit命令退出RabbitMQ容器。然后开始编码。1.2消息传递接下来,开始消息传递。首先我们创建一个SpringBoot项目,引入Web和RabbitMQ依赖,如下:项目创建成功后,在application.properties中配置RabbitMQ的基本信息,如下:spring.rabbitmq.host=localhostspring.rabbitmq.password=guestspring.rabbitmq。username=guestspring.rabbitmq.virtual-host=/接下来提供一个RabbitMQ配置类:@ConfigurationpublicclassRabbitConfig{publicstaticfinalStringQUEUE_NAME="javaboy_delay_queue";publicstaticfinalStringEXCHANGE_NAME="javaboy_delay_exchange";publicStringTANGE_CHfinal="x-delayed-message";@BeanQueuequeue(){returnnewQueue(QUEUE_NAME,true,false,false);}@BeanCustomExchangecustomExchange(){Mapargs=newHashMap<>();args.put("x-delayed-type","direct");返回新的CustomExchange(EXCHANGE_NAME,EXCHANGE_TYPE,true,false,args);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noarGS();}}这里的主要区别在于开关的定义不同。小伙伴们需要注意我们这里使用的switch是CustomExchange,它是Spring中提供的一个switch。创建CustomExchange时有五个参数。含义如下:交换机名称。开关型,这个地方是固定的。开关是否持久。如果没有绑定队列,是否删除交换。其他参数。最后一个args参数指定开关消息分发的类型。这种类型就是众所周知的direct、fanout、topic和header。使用哪种类型将决定交换机将来以何种方式分发消息。接下来我们创建一个消息消费者:@ComponentpublicclassMsgReceiver{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MsgReceiver.class);@RabbitListener(queues=RabbitConfig.QUEUE_NAME)publicvoidhandleMsg(Stringmsg){logger.info("handleMsg,{}",msg);}}只打印消息的内容。接下来,编写一个单元测试方法来发送消息:@SpringBootTestclassMqDelayedMsgDemoApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads()throwsUnsupportedEncodingException{Messagemsg=MessageBuilder.withBody(("你好江南小雨"+newDate()).getBytes("UTF-8")).setHeader("x-delay",3000)。建造();rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,RabbitConfig.QUEUE_NAME,msg);}}设置消息延迟的标题。好了,接下来启动SpringBoot项目,然后运行单元测试方法发送消息。最终的控制台日志如下:从日志中可以看到实现了消息延迟。2.DLX实现延迟队列2.1延迟队列实现思路延迟队列实现的思路也很简单,就是我们在上一篇文章中提到的DLX(死信开关)+TTL(消息超时)。我们可以把死信队列看成是延迟队列。具体来说,如果一条消息需要延迟30分钟执行,我们将这条消息的有效期设置为30分钟,并为这条消息配置死信开关和死信routing_key,不为这条消息设置消费者这个消息队列,那么30分钟后,这个消息因为还没有被消费者消费,就进入了死信队列。这时,我们有一个消费者在死信队列中“停留”。消息一进入死信队列,立即被消费。这就是延迟队列的实现思路,是不是很简单?2.2案例接下来,宋大哥用一个简单的案例来演示延迟队列的具体实现。首先准备一个RabbitMQ启动。然后我们创建一个SpringBoot工程,引入RabbitMQ依赖:然后在application.properties中配置RabbitMQ的基本连接信息:spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672接下来,让我们配置两个消息队列:一个普通队列和一个死信队列:publicstaticfinalStringJAVABOY_EXCHANGE_NAME="javaboy_exchange_name";publicstaticfinalBOKYEOUTStringJAVA_="javaboy_routing_key";publicstaticfinalStringDLX_QUEUE_NAME="dlx_queue_name";publicstaticfinalStringDLX_EXCHANGE_NAME="dlx_exchange_name";publicstaticfinalStringDLX_ROUTING_KEY="dlx_routing_key";/***死信队列*/@redlxQueue(){returnnewQueue(DLX_QUEUE_NAME,true,false,false);}/***死信交换*@return*/@BeanDirectExchangedlxExchange(){returnnewDirectExchange(DLX_EXCHANGE_NAME,真,假);}/***绑定死信队列和死信交换*@return*/@BeanBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).与(DLX_ROUTING_KEY);}/***普通消息队列*@return*/@BeanQueuejavaboyQueue(){Mapargs=newHashMap<>();//设置消息过期时间args.put("x-message-ttl",1000*10);//设置死信交换args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);//设置死信routing_keyargs.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);返回新队列(JAVABOY_QUEUE_NAME,真,假,假,参数);}/***通用开关*@return*/@BeanDirectExchangejavaboyExchange(){returnnewDirectExchange(JAVABOY_EXCHANGE_NAME,true,false);}/***绑定普通队列和对应的switch*@return*/@BeanBindingjavaboyBinding(){returnBindingBuilder.bind(javaboy队列()).to(javaboyExchange()).with(JAVABOY_ROUTING_KEY);}}虽然这个配置代码有点长,但是原理其实简单的配置可以分为两组,第一组配置死信队列,第二组配置普通队列。每个组由消息队列、消息交换和绑定组成。配置消息队列时,为消息队列指定死信队列。不熟悉的可以看之前的文章,传送门:RabbitMQ中的消息会过期吗?.配置队列中的消息过期时间时,默认时间单位为毫秒。接下来,我们为死信队列配置一个消费者,如下所示:@RabbitListener(queues=QueueConfig.DLX_QUEUE_NAME)publicvoidhandle(Stringmsg){logger.info(msg);}}收到消息后,打印出来。就是这样。启动项目。最后我们在单元测试中发送消息:@SpringBootTestclassDelayQueueApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads(){System.out.println(newDate());rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME,QueueConfig.YRINGJAVA_BOK_"你好javaboy!");}}这个没什么好说的,就是一个正常的消息发送,这条消息会在10秒后在死信队列的consumer中打印出来。3.总结,这里是我们使用RabbitMQ做延迟队列的两个思路~有兴趣的朋友可以试试~