当前位置: 首页 > 科技观察

RabbitMQ死信机制真的可以作为延迟任务场景的解决方案吗?

时间:2023-03-13 08:03:58 科技观察

一篇贴合实际应用场景的好文章。我觉得对死信和延迟队列的分析非常好。我在不知不觉中阅读了它!在现场很常见。例如,如果订单在xx分钟内未付款,订单将被关闭。比如红包在XX分钟内没有抢到,红包将失效。所以说到延迟任务的实现,很多人可能第一时间想到轮询,也就是设置定时任务,稍有经验的开发者就知道了。轮询机制会给数据库带来很大的压力,当然对于小企业来说无所谓。如果是处理数据量很大的业务,轮询肯定行不通。而要保证高可用,就不得不涉及到分布式定时任务。不管怎么做,都非常麻烦。很多小聪明鬼都知道可以用消息队列来实现。确实,MQ的异步性和解耦性在任务延迟的场景下可以爆发出强大的战斗力。由于RabbitMQ应用广泛,对于延迟任务的实现自然有其解决方案。下面文章演示了基于SpringBoot环境使用RabbitMQ实现延时任务的方案。以文字和UML活动图的形式讲述了RabbitMQ所谓的“死信”机制是如何实现延迟消息的需求及其功能不足的。1.死信说到死信,balabala中的死信队列、死信开关等名词就出来了。这个术语有点抽象,但并不难理解。Deadletter死信,就是死了~比如你的producer给MQBroker发了一条消息,这条消息因为各种原因没有被消费,最终导致消息挂掉/死掉。你可以认为是死信,那么死信队列呢?死信开关呢?其实这两个东西跟普通的队列和交换机一样,没有本质区别。但是,他们可以设置为“死信”handler,就是由于各种原因没有被消费,最终死亡的消息,然后将消息转发给死信开关,他会处理死消息。这个设置,在RabbitMQ中处理是点对点的,即一个普通队列可以绑定一个死信交换。指定队列的死信交换需要设置队列的属性参数(arguments)。具体参数名称:绑定死信交换:x-dead-letter-exchange路由死信时使用的路由键:x-dead-letter-routing-key2,什么情况下会出现死信?在RabbitMQ中,有几种情况会出现死信:1、队列长度已满;是东西。TTL是timetolive的缩写,即生存时间。在RabbitMQ中,可以在队列和单个消息上设置TTL。如果设置在一个队列上,可以认为这个队列中所有消息的TTL都是设置值。队列TTL属性参数:x-message-ttl单条消息TTL参数:expiration如果设置了TTL值,消息在队列中停留时间超过TTL还没有被消费,消息队列会丢弃该消息,生成一个“deadletter""。产生死信后,如果队列配置了死信开关,消息流会转到绑定的死信开关,再由死信开关路由到死信队列。然后把死信队列推送给这个队列的消费者3.基于死信机制的延迟任务实现方案那么根据上面知识点1和2,对应的延迟任务实现方案自然就出来了具体解决方案:1.创建一个没有消费者的队列,设置TTL值,绑定死信开关2.所有需要延迟的消息都发送到这个队列3.死信开关绑定对应的死信r队列,它的消费者是处理延迟消息的服务。按照上述方案的逻辑,向队列发送消息后,必须等到消息过期——即指定的延迟一定时间后,消费者才会对消息进行处理。可以实现延迟任务的需求。活动图如下:3.RabbitMQ死信在Spring中的实现了解了原理和机制,我们先上手。依赖配置和具体application.yml文件的写法这里不再赘述。具体可以参考我之前的文章。最重要也是最核心的就是RabbitMQ的队列和交换机的配置。根据以上知识点,可以得出结论,只要配置好TTL和死信开关,就可以实现功能。那么这里直接贴出我写的配置类:="skypyb-dead-queue";`publicfinalstaticStringSKYPYB_ORDINARY_KEY="skypyb.key.ordinary.one";`publicfinalstaticStringSKYPYB_DEAD_KEY="skypyb.key.dead";`@Bean`publicDirectExchangeordinaryExchange(){`returnnewDirectExchange(SKYPYB_ORDINARY_EXCHANGE,false,true);`}`@Bean`publicDirectExchangedeadExchange(){`returnnewDirectExchange(SKYPYB_DEAD_EXCHANGE,false,true);`}`@Bean`publicQueueordinaryQueue(){`Maparguments=newHashMap<>();`//TTL5s`arguments.put("x-message-ttl",1000*5);`//绑定死信队列和死信交换`arguments.put("x-dead-letter-exchange",SKYPYB_DEAD_EXCHANGE);`arguments.put("x-dead-letter-routing-key",SKYPYB_DEAD_KEY);`returnnewQueue(SKYPYB_ORDINARY_QUEUE_1,false,false,true,arguments);`}`@Bean`publicdeadQueue(){`returnnewQueue(SKYPYB_DEAD_QUEUEse,false,falsetrue);`}`@Bean`publicBindingbindingOrdinaryExchangeAndQueue(){`returnBindingBuilder。deadQueue()).to(deadExchange()).with(SKYPYB_DEAD_KEY);`}`}`可以看到我定义了几个普通队列和死信队列相关的常量,并根据这些常量实例化了对应的开关、队列,并设置绑定关系。在实例化普通队列时,进行特殊处理;普通队列绑定死信交换机,指定死信路由键。它在指定其TTL值(5s到期)后被实例化。所以现在有了这样的配置,延迟消息需要的条件都已经实现了。写一个consumer和sender来测试一下。消费者:@RabbitListener(queues={RabbitBindConfig.SKYPYB_DEAD_QUEUE})`@Component`publicclassDeadReceiver{`privateLoggerlogger=LoggerFactory.getLogger(DeadReceiver.class);`@RabbitHandler`publicvoidonDeadMessage(@PayloadStringmessage,`@HeadersMapheaders,`ChannelIOException)throw`logger.info("死亡信队消费者接收消息:{}",message);`//deliverytag可以从headers中get出来`LongdeliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);`try{`channel.basicAck(deliveryTag,false);`}catch(Exceptione){`System.err.println(e.getMessage());`booleanredelivered=(boolean)headers.get(AmqpHeaders.REDELIVERED);`channel.basicNack(deliveryTag,false,!redelivered);`}`}`}`发送者:`@RunWith(SpringRunner.class)`@SpringBootTest(classes=Application.class)`publicclassRabbitmqTest{`@Autowired`privateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory。getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,"消息正文");`rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,"消息正文");`logger.info("-----消息发送-----");`}`}`最终控制台结果确实实现了延迟队列的功能:2020-01-1211:14:17.582INFO12032—[main]com.skypyb.test.RabbitmqTest:——消息已被sent—–2020-01-1211:14:22.599INFO10576—[cTaskExecutor-2]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者接收消息:消息正文2020-01-1211:14:22.599INFO10576—[cTaskExecutor-1]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者接收消息:除了队列TTL,还可以设置消息级别的TTL粒度。SpringAMQP中单条消息的TTL设置需要在MessageProperties类中设置,每条消息都会有一个内置的类。为了方便,SpringAMQP在消息发送过程中提供了一个hook,可以让我们设置Message的属性,即MessagePostProcessor@FunctionalInterface`publicinterfaceMessagePostProcessor{`MessagepostProcessMessage(Messagemessage)throwsAmqpException;`defaultMessagepostProcessMessage(Messagemessage,Correlation`messagemessage)p(Processmessage,Correlationmessagemessage){ost`}`}`由于他使用了@FunctionalInterface注解,为了方便我写一个lambda表达式,并设置单条消息的TTL为3秒:@RunWith(SpringRunner.class)@SpringBootTest(classes=Application.class)publicclassRabbitmqTest{@AutowiredprivateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory.getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate.convertAndSend(`RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`NAKEYDIRYBConfig.OR消息体",`(msg)->{`msg.getMessageProperties().setExpiration("3000");`returnmsg;`});`兔子Template.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,"消息正文");`logger.info("-----消息已发送-----");`}`}`修改代码再次发送,控制台输出:2020-01-1211:51:22.788INFO26232—[main]com.skypyb.test.RabbitmqTest:—–消息发送—–2020-01-1211:51:25.787INFO10576—[cTaskExecutor-4]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者接收message:消息体2020-01-1211:51:27.784INFO10576—[cTaskExecutor-5]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者收到消息:可以看到消息体,果然有消息接收时间区别刚好符合设置的消息TTL3s和队列TTL5s然而,这个功能是有缺陷的。这是使用RabbitMQ死信机制作为延迟任务不可避免会出现的不足。让我在下面解释一下。显示:@RunWith(SpringRunner.class)`@SpringBootTest(classes=Application.class)`publicclassRabbitmqTest{`@Autowired`privateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory.getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,"信息体");`rabbitTemplate.convertAndSend(`RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,"_KEY">{s,`g){sgetMessageProperties().setExpiration("3000");`returnmsg;`});`logger.info("-----消息已发送-----");`}`}`运行代码,结果,执行偏离想象...Consoleprint:2020-01-1215:00:19.371INFO9680—[main]com.skypyb.test.RabbitmqTest:—–messagesent—–2020-01-1215:00:24.380INFO10576—[cTaskExecutor-1]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者收到消息:消息体2020-01-1215:00:24.380INFO10576—[cTaskExecutor-3]c.s.rabbitmq.controller.DeadReceiver:死信队列消费者接收消息:消息体可见,当消费者消费消息,他们等待整整5秒!?这是为什么?这是因为RabbitMQ的队列是RabbitMQ的特性导致的FIFO有序队列,输入的消息都是顺序压入MQ的。而RabbitMQ只会对队尾的消息做超时判断,所以才会出现上面的情况。即即使第二条消息3秒后过期,因为第一条消息5秒后过期,RabbitMQ会等到第一条消息被丢弃后再判断第二条消息。最后出现了第一篇过期后第二篇过期的结果。结语其实就平时可能遇到的场景来说,使用RabbitMQ的死信机制就足够了。毕竟大部分延时任务都是有固定时间的,比如下单后半小时不付款就关闭订单的场景。只要场景是固定时间的延迟任务,RabbitMQ无疑可以很好地照顾这个需求。如果你回答了题目中的问题,就可以说RabbitMQ死信机制可以作为延迟任务场景的解决方案。但是由于RabbitMQ消息死亡不是异步的,所以是阻塞的。因此,它不能作为复杂延迟场景的解决方案——每条消息的死亡需要相互独立。