Node.js结合RabbitMQ延迟队列实现定时任务、setInterval两个API或者通过node-schedule等第三方库实现。这样对于简单的定时任务是可以的,但是对于过于复杂,需要高可用的系统会有如下缺点。存在一些消耗系统内存的问题。如果定时任务很多,长时间得不到释放,就会一直占用系统进程,消耗内存。单线程如何保证系统崩溃前的定时任务不受影响?多进程集群模式一致性保证?setTimeout和setInterval会有时间误差,对时间精度要求高的不能接受。RabbitMQTTL+DLX实现定时任务。RabbitMQ本身不支持它。它可以通过它提供的两个特性来实现,Time-To-LiveandExpiration,以及DeadLetterExchanges。通过下面的泳道图可以看到一条消息从发布到消费的全过程。DeadLetterQueueDeadLetterQueue的全称,Dead-Letter-Exchange,简称DLX,是RabbitMQ中的一种开关。如果一条消息在一段时间后没有被消费,它就会变成死信,重新发布到另一个DLX交换队列。因此得名死信队列。死信队列有几种情况。消息被拒绝。TTL已过期。队列达到最大长度。Routingkey注:Dead-Letter-Exchange也是一种常见的Exchange消息TTL。消息TTL是指消息的存活时间。RabbitMQ支持消息和队列设置TTL,分别如下:MessagesettingTTL:消息设置发送时设置TTL。它通过x-message-ttl或过期字段设置。单位为毫秒,表示消息的过期时间。每条消息的TTL可以不同。队列设置TTL:队列的设置是在消息进入队列时计算的。通过x-expires设置,队列中的所有消息都有相同的过期时间。当超过队列的超时设置时,消息会被自动清除。注意:如果设置了以上两种方式,消息的TTL会是两者中最小的。Nodejs运行RabbitMQ实现延迟队列。推荐使用amqplib库,一个由Node.js实现的RabbitMQ客户端。初始化RabbitMQrabbitmq.js//npminstallamqplibconstamqp=require('amqplib');letconnection=null;module.exports={connection,init:()=>amqp.connect('amqp://localhost:5672').then(conn=>{connection=conn;console.log('rabbitmqconnectsuccess');returnconnection;})}生产者/***由一个死亡信队列表*@param{Object}connnection*/asyncfunctionproducerDLX(connnection){consttestExchange='testEx';consttestQueue='testQu';consttestExchangeDLX='testExDLX';consttestRoutingKeyDLX='testRoutingKeyDLX';constch=awaitconnnection.createChannel();awaitch.assertExchange(testExchange,'direct',{durable:true});constqueueResult=awaitch.assertQueue(testQueue,{exclusive:false,deadLetterExchange:testExchangeDLX,deadLetterRoutingKey:testRoutingKeyDLX,});等待ch.bindQueue(queueResult.queue,testExchange);constmsg='你好世界!';骗子sole.log('producermsg:',msg);等待ch.sendToQueue(queueResult.queue,newBuffer(msg),{expiration:'10000'});ch.close();}consumerconsumer.jsconstrabbitmq=require('./rabbitmq.js');/***消费一个死信队列*@param{Object}connnection*/asyncfunctionconsumerDLX(connnection){consttestExchangeDLX='testExDLX';consttestRoutingKeyDLX='testRoutingKeyDLX';consttestQueueDLX='testQueueDLX';constch=awaitconnnection.createChannel();awaitch.assertExchange(testExchangeDLX,'direct',{durable:true});constqueueResult=awaitch.assertQueue(testQueueDLX,{exclusive:false,});等待ch.bindQueue(queueResult.queue,testExchangeDLX,testRoutingKeyDLX);awaitch.consume(queueResult.queue,msg=>{console.log('consumermsg:',msg.content.toString());},{noAck:true});}//消费消息rabbitmq.init().then(connection=>consumerDLX(connection));运行查看分别执行consumer和producer,可以看到producer在44秒后被释放收到消息,consumer54秒收到消息,实现10秒定时执行$nodeconsumer#Executeconsumer[2019-05-07T08:45:23.099][INFO]default-rabbitmqconnectsuccess[2019-05-07T08:45:54.562][INFO]默认-消费者消息:helloworld!$nodeproducer#executeproducer[2019-05-07T08:45:43.973][INFO]默认-rabbitmqconnectsuccess[2019-05-07T08:45:44.000][INFO]default-producermsg:helloworld!在管理控制台查看testQu队列。为我们定义的普通队列消息过期,就会变成死信。会路由到testQueueDLX队列形成死信队列源码地址:RabbitMQ延迟队列实现定时任务(Node.js客户端版Demo)作者:君梅链接:https://www.imooc.com/article。..来源:MOOCGithub:Node.js技术栈
