当前位置: 首页 > 后端技术 > Node.js

Node.js+Redis实现消息队列最佳实践

时间:2023-04-03 19:26:11 Node.js

问题来源于最近开发一个前端监控小项目。由于在技术栈中采用了Node+Redis作为消息队列实现,这里记录一下在Node中通过Redis实现消息队列时的使用方法和注意事项什么是消息队列?消息队列是用于存储消息的队列结构。可用于解决分布式系统通信解耦系统模块、异步任务处理、请求调峰限流等。问题。既然叫队列,一般是一侧推送消息,另一侧消费消息;大概是以下过程。在我的需求中,我使用消息队列进行异步存储处理。我通过Node做了一个外部的日志接收层(也就是图中的KoaServer)来接收和上报日志。当KoaServer接收并完成接收后,会立即响应OK给用户,因为用户不需要感知后端日志的存储结果。的。因此,KoaServer收到日志后,可以将消息放入Redis消息队列中。在另一端,我启动了一个消费者程序(即上图中的日志存储模块,也是一个Node脚本)来读取MQ消息并进行存储操作。Redis如何做消息队列消息队列,其实有两种。一种是基于队列模型,另一种是基于订阅-发布模型。对于订阅-发布模式来说,就是多个消费者可以订阅一个频道的一条消息。当消息进入频道时,所有订阅者都会收到通知,所有订阅者都可以订阅同一条消息。加工(消费)。对于队列模型,当一条消息进入队列时,它只会在另一端出队一次。如果有多个消费者在等待这个队列,那么只有一个消费者可以拿到消息进行处理。在Redis中,以上两种模型可以分别通过pub/sub功能和链表结构来实现。对于我的日志接收场景,我期望的是不管我后端有多少入库消费者,我希望同一个报表只能入库一次。所以对于我来说,需要使用队列模型来实现消息队列,也就是使用Redis的List结构。CLI简单实验我们使用redis-cli来简单实验一下list结构如何作为消息队列。首先,使用lpush命令向redis中某个队列的左侧推送一条消息:lpushmy_mqabc这样,我们向my_mq队列推送一条内容为abc的消息。由于此时没有消费者,消息仍然存在于队列中。我们甚至可以再次压入第二条def消息,通过llen命令查看当前队列长度。接下来,我们在另一个命令行窗口输入:rpopmy_mq意思是从my_mq队列右边取一条消息。结果:Redis的List结构在阻塞模式下测试,作为消息队列,方便大家使用。提供了阻塞模式。阻塞和非阻塞有什么区别?我们使用一个新的命令行窗口来执行阻塞等待消息:brpopmy_mq0注意后面一定要加一个超时时间,0表示阻塞等待。然后,我们看到这里redis命令行被阻塞了,处于等待消息的状态:而如果使用rpop非阻塞命令,则会返回empty,直接退出等待:因此,可以发现阻塞和非阻塞模式,最大的区别:就是当消息队列为空时,阻塞模式不会退出等待,而非阻塞模式则直接返回空退出等待。brpop在等待的时候,我们向队列中推送一条消息:lpushmy_mq123可以看到阻塞模式下的消费者已经收到消息123,同时退出等待:这说明:阻塞模式:当队列is为空时(即没有消息时),会一直阻塞;等待消息时会退出非阻塞模式:当队列为空(即没有消息)时,不会阻塞,直接返回null退出。所以,所谓redis的Blocking就是当没有收到消息时,阻塞等待;当它等待消息时,它立即退出;它不会阻塞在一个循环中---也就是说,它在等待一条消息后不再阻塞和监听队列。这会给我们编写Node代码带来一些启发。Node怎么用到点子上。我们在Node中编写代码以使用与在cli界面中使用它相同的方式使用redis消息队列。但是我们需要考虑在消费者端如何编写代码来实现所谓的持续监听队列。毕竟,我们的消费者需要常驻进程来持续监控队列消息。这并不意味着进程在收到消息时退出。因此,我们需要编写一个常驻Node进程,可以持续等待redis队列消息。收到消息后,将由Node脚本处理;处理完后,继续等待队列中的下一条消息。等等等等。首先,我们可以编写这样的代码在Node中创建一个redis客户端:constredis=require('promise-redis-client')letclient=redis.createClient(...options)client.on('error',err=>{console.log('redislinkerror')})client.on('ready',()=>{console.log('redisready')})为了实现redis客户端创建时,会再次开启消息队列监控,我们将上述代码封装成一个模块,以promise方式导出://redis.jsconstredis=require('promise-redis-client')exports.createClient=function(){returnnewPromise((resolve,reject)=>{letclient=redis.createClient(...options)client.on('error',err=>{console.log('redis连接错误')reject(err)})client.on('ready',()=>{console.log('redisready')resolve(client)})})}OK,接下来我们可以在app中编写队列的消费代码了。js。为了更优雅地使用async/await,我们可以这样写一个startWait函数:asyncfunctionstartWaitMsg(client){...}然后,当客户端就绪时,启动它:const{createClient}=require('./redis.js')constc=createClient()client.then(asyncc=>{awaitstartWaitMsg(c)})最困难的部分是如何编写startWaitMsg函数。由于我们使用的是redis库的promise版本。因此,我们可以这样读取消息:asyncfunctionstartWaitMsg(client){awaitclient.rpop('my_mq')}但是这样,redis返回消息后,node继续执行,最后执行到startWaitMsg函数结束了。虽然整个Node进程不会因为redis连接没有断开而退出,但是此时node已经无法再执行client.rpop代码,也就无法再次从消息队列中获取新的消息。循环实现连续等待我们想到可以用循环来实现连续监听队列。于是,将代码改为:asyncfunctionstartWaitMsg(client){while(true){awaitclient.rpop('my_mq')}}这样就实现了rpop指令的连续执行。但是,如果你在rpop代码后面加上一行log打印,你会观察到client.rpop继续打印null。这是因为rpop命令是非阻塞的,所以当队列中没有消息时,它返回一个null,从而触发你的while循环继续执行。这样会导致我们的程序占用过多的cpu时间片,redis网络IO会有过多无谓的消耗。整个while循环是连续执行的。只有rpop行执行时,EventLoop才会短暂释放给其他代码,对脚本性能影响较大。国家提倡节能减排,这显然不是最优雅的。使用阻塞模式让我们尝试使用redis队列的阻塞模式。asyncfunctionstartWaitMsg(c){while(true){constres=awaitc.brpop('my_mq',0)console.log('receivedmessage',res)}}通过brpop指令可以阻塞brpop代码在这里。这里所谓的阻塞不是Node程序的阻塞,而是redis客户端本身的阻塞。其实对于Node进程来说,rpop和brpop都是非阻塞的异步IO操作,但是当消息队列为空时,rpop底层会立即返回null,所以node进程会解析一个空的,而brpop会在底层阻塞redis等消息,消息到达后通知Node进程resolve。因此,对于Node来说,brpop可以避免自己实现队列的内容轮询,可以在等待IO回调的同时,将CPU留给其他任务。这大大降低了Node进程的CPU消耗。redis断开连接无法继续消费的问题在代码运行过程中,又出现了一个新的问题:redis客户端在某些情况下会断开连接(可能是网络等原因)。通过分析日志,发现一旦出现连接异常,我们的消费者脚本就无法继续接收新消息(我的日志存储功能失效)。经过分析,发现问题的原因还是出在我们的while语句和brpop的配合上。当redis客户端对象发生连接异常时,会向当前等待的brpop代码抛出一个reject异常。我们再回顾一下上面代码的startWait函数:asyncfunctionstartWaitMsg(c){while(true){constres=awaitc.brpop('my_mq',0)console.log('receivedmessage',res)}}ifawaitbrpop行抛出一个拒绝异常。由于我们没有捕捉到异常,所以会从startWaitMsg函数中抛出异常,结果就是退出了while循环。想想怎么解决其实当连接出现问题的时候,我们需要重新连接客户端。但是redisclient会自动执行这个重连机制,所以我们的代码需要做的就是保证while循环在异常发生的时候能够恢复。所以,当异常发生时,我们继续:asyncfunctionstartWaitMsg(c){while(true){letres=nulltr??y{res=awaitc.brpop('my_mq',0)console.log('receivedmessage',res)}catch(err){console.log('brpoperror,re-brpop')continue}//...消息处理任务}}由于redis客户端内部重连过程将不再触发reject(只是断开连接时触发一次),所以continue之后的brpop会“阻塞”,再次等待,这样我们的消费者就可以正常生活了。最终代码客户端连接代码文件:redis.jsconstredis=require('promise-redis-client')exports.createClient=function(){returnnewPromise((resolve,reject)=>{letclient=redis.createClient(...选项)client.on('error',err=>{console.log('redisconnectionerror')reject(err)})client.on('ready',()=>{console.log('redisready')resolve(client)})})}app.jsconst{createClient}=require('./redis.js')constc=createClient()client.then(asyncc=>{awaitstartWaitMsg(c)//开始消息监听})asyncfunctionstartWaitMsg(c){while(true){letres=nulltr??y{res=awaitc.brpop('my_mq',0)console.log('receivedmessage',res)}catch(err){console.log('brpoperror,re-brpop')continue}//...消息处理任务}}运行半小时以上收不到消息的问题Afterrunningfor过了一段时间,又发现了一个问题新的问题就是我们的brpop经过了比较长的一段时间后,已经收不到消息了。队列中有消息,但无法消费。通过查询redis服务器上的连接列表,发现brpop的连接仍然存在(只是处于比较长的空闲状态),但是我们节点客户端没有错误日志。经过不断的测试和寻找解决方案,大概猜测问题的根源来自节点或服务器底层的socket超时,所以最好给brpop设置一个合适的超时时间(而不是简单的设置为0).例如:awaitc.brpop('my_mq',60*10)//10分钟后会超时,然后通过while继续重新brpop。参考自:https://github.com/andymccurdy/redis-py/issues/1210https://github.com/andymccurdy/redis-py/issues/1199https://github.com/andymccurdy/redis-py/issues/1210总结了redis的list数据结构,可以作为消息队列,实现Node中的队列模式,可以使用while(true)实现队列的持续循环监听。使用brpop阻塞指令,可以避免cpu空转监听队列。在Node中,注意redis连接断开时的错误处理,避免因为错误导致无法重新监听队列。