当前位置: 首页 > 后端技术 > Node.js

使用RabbitMQ的死信队列做定时任务是很常见的业务场景

时间:2023-04-03 11:17:52 Node.js

开发中的定时任务是非常常见的业务场景。在代码层面,Node.js可以使用setTimeout和setInerval或node-schedule等基本语法。库实现部分目的,在第三方服务上可以使用Redis的KeyspaceNotification或者Linux自带的crontab来做定时任务。RabbitMQ作为一个消息中间件,也可以利用它的死信队列来达到定时任务的目的。本文使用Node.js作为演示语言,使用amqplib来操作RabbitMQ。RabbitMQ中有一个exchange叫DLX,全称是Dead-Letter-Exchange,可以称为死信交换。当消息成为队列中的死消息时,它将被重新发送到另一个交换器。这个exchange就是DLX,DLX绑定的队列叫做死信队列。消息变成死信一般有以下几种情况:消息被拒绝,requeue参数设置为false。消息过期队列达到最大长度。DLX也是普通交换机,和普通交换机没什么区别。它可以在任何队列中。指定其实就是设置一个队列的属性。当这个队列出现死信时,RabbitMQ会自动将消息重新发布到设置的DLX中,然后再路由到另一个队列,即死信队列。将DLX添加到队列中,需要在创建队列时设置其deadLetterExchange和deadLetterRoutingKey参数。deadLetterRoutingKey参数是可选的,表示DLX指定的路由键。如果未指定,将使用原始队列的路由键。constamqp=require('amqplib');constmyNormalEx='my_normal_exchange';constmyNormalQueue='my_normal_queue';constmyDeadLetterEx='my_dead_letter_exchange';constmyDeadLetterRoutingKey='my_dead_letter_letter_exchange';constmyDeadLetterRoutingKey='my_dead_letter_letter_exchange';//localhost').then((conn)=>{connection=conn;returnconn.createChannel();}).then((ch)=>{channel=ch;ch.assertExchange(myNormalEx,'direct',{durable:false});returnch.assertQueue(myNormalQueue,{exclusive:false,deadLetterExchange:myDeadLetterEx,deadLetterRoutingKey:myDeadLetterRoutingKey,});}).then((ok)=>{channel.bindQueue(ok.queue,myNormalEx);channel.sendToQueue(ok.queue,Buffer.from('hello'));setTimeout(function(){connection.close();process.exit(0)},500);}).catch(console.错误);上面的代码首先声明了一个exchangemyNormalEx,然后声明了一个queuemyNormalQueue,并在声明queue时通过设置其deadLetterExchange参数为其添加了一个DLX。所以当队列myNormalQueue中的一条消息变成死信时,它会被发布到myDeadLetterEx。过期时间(TTL)在RabbbitMQ中,可以为消息和队列设置过期时间。当通过队列属性设置过期时间时,队列中的所有消息都具有相同的过期时间。为消息设置单独的过期时间时,每条消息的TTL可能不同。如果两种方法一起使用,则消息的TTL是两者中较小的一个。一旦消息在队列中的存活时间超过设定的TTL值,就会成为“死消息”(DeadMessage),消费者将无法再接收到消息。给每条消息设置TTL就是在发送消息的时候设置过期参数,单位是毫秒。constamqp=require('amqplib');constmyNormalEx='my_normal_exchange';constmyNormalQueue='my_normal_queue';constmyDeadLetterEx='my_dead_letter_exchange';constmyDeadLetterRoutingKey='my_dead_letter_letter_exchange';constmyDeadLetterRoutingKey='my_dead_letter_letter_exchange';//localhost').then((conn)=>{connection=conn;returnconn.createChannel();}).then((ch)=>{channel=ch;ch.assertExchange(myNormalEx,'direct',{durable:false});returnch.assertQueue(myNormalQueue,{exclusive:false,deadLetterExchange:myDeadLetterEx,deadLetterRoutingKey:myDeadLetterRoutingKey,});}).then((ok)=>{channel.bindQueue(ok.queue,myNormalEx);channel.sendToQueue(ok.queue,Buffer.from('hello'),{expiration:'4000'});setTimeout(function(){connection.close();process.exit(0)},500);}).catch(console.error);上面代码向队列发送消息时,通过传递{expiration:'4000'}设置这条消息的过期时间为4秒,设置消息4秒后过期,消息不一定会被丢弃4秒后或输入死信,只有当消息到达队头,即将被消费时,才会判断是否过期,如果没有过期,就会被消费者消费,如果已经过期,就会被消费者消费因为队列中的消息被删除或者成为死信定时任务,过期后会成为死信,死信会发布到消息所在队列的DLX,所以通过设置过期时间对于消息,然后消费消息所在队列DLX绑定的队列,达到定时处理一个任务的目的。简单的说,当有一个队列queue1时,它的DLX为deadEx1,deadEx1绑定了一个队列deadQueue1,当队列queue1中的一条消息过期成为死信时,会发布到deadEx1,通过消费queuedeadQueue1中的消息等同于消费了queue1中的死信消息过期。消费死信队列的代码如下:'/localhost').then((conn)=>{returnconn.createChannel();}).then((ch)=>{channel=ch;ch.assertExchange(myDeadLetterEx,'direct',{durable:false});returnch.assertQueue(myDeadLetterQueue,{exclusive:false});}).then((ok)=>{channel.bindQueue(ok.queue,myDeadLetterEx,myDeadLetterRoutingKey);channel.consume(ok.queue,(msg)=>{console.log("[x]%s:'%s'",msg.fields.routingKey,msg.content.toString());},{noAck:true})}).catch(控制台错误);这里需要注意的是,如果声明的myDeadLetterEx是direct类型,那么在为其绑定队列时必须指定一个BindingKey,即这里的myDeadLetterRoutingKey。如果不指定Bindingkey,则需要将myDeadLetterEx声明为扇出类型。