前言消息队列是一个存储数据的中间件,可以理解为一个容器。生产者将消息发布到队列中,消费者可以拉取消息进行消费。如果消费者无意消费,则消息队列会一直保留该消息,直到消费者有意消费为止。设计思路生产者通过lpush消息连接redis到指定通道消费者通过brpop阻塞连接redis死循环获取消息消费循环获取消息获取下一条消息要使用docker。只要确保您可以连接到redis服务即可。#使用docker拉取redis镜像dockerpullredis:latest#启动redis服务#--name后面是容器名,方便后续维护管理#-p后面是将容器服务的6379端口映射到容器服务的6379端口hostdockerrun-itd--nameredis-mq-p6379:6379redis#============docker常用基本操作(题外话)==================#拉取镜像dockerpullimagename#查看镜像dockerimages#删除镜像dockerrmiimagename#查看正在运行的容器(仅在启动时)dockerps#查看正在运行的容器(包括未启动的)dockerps-a#启动containerdockerstart容器名称/容器id#停止容器dockerstop容器名称/容器idNodejs连接初始化项目#创建文件夹并进入mkdirqueue-node-redis&&cdqueue-node-redis#yarninitializationyarninit-y#下载redis包,#之所以指定版本是为了尽量减少各位道友失败的几率。毕竟前端工具的迭代太快了。yarnaddredis@4.2.0创建lib和utils目录├──.gitignore├──lib├──package.json├──utils│└──redis.js└──yarn.lockutils/redis.jsconstredis=require("redis");constredisCreateClient=async(config)=>{try{constclient=redis.createClient({url:`redis://${config.host}:${config.port}`,});等待客户端连接();等待客户端选择(config.db);console.log("redis连接成功ess");returnclient;}catch(err){console.log("redisconnecterror");throwerr;}};module.exports={redisCreateClient,};index.js在项目根目录创建这个文件,测试redis连接是否成功const{redisCreateClient}=require("./utils/redis");consttest=async()=>{constclient=awaitredisCreateClient({host:"127.0.0.1",port:6379,db:0,});};test();出现如下图时,可以参考nodejs进阶视频讲解:进入学习minimist轻量级命令行参数解析引擎#安装minimistyarn添加minimist@1.2.6使用方法constsystemArg=require("minimist")(process.argv.slice(2));console.log(systemArg);#runnodeindex.js--nametest#output{_:[],name:'test'}正文从目录结构和文件开始创建,一步步教程目录结构变化├──config.js#配置文件├──lib│└──index.js#主目录入口文件├──package.json├──utils#工具函数库│└──redis.js└──yarn.lockconfig.js配置文件的思路是比执行代码更重要module.exports={//redis配置redis:{default:{host:"127.0.0.1",port:6379,password:"",db:0,},},//MessagequeuechannelsettingmqList:[{//消息通道名称name:"QUEUE_MY_MQ",//阻塞值超时配置brPopTimeout:100,//启用的任务数此配置需要PM启动才能生效instances:1,//redis配置keyredis:"default",},],};lib/index.js对配置做程序异常处理constsystemArg=require("minimist")(process.argv.slice(2));constconfig=require("../config");const{bootstrap}=require("./core");//程序自检//判断通道名是否输入if(!systemArg.name){console.error("ERROR:通道名不能为空");process.exit(99);}//通道队列配置constmqConfig=config.mqList.find((item)=>item.name===systemArg.name)??false;//如果配置不存在if(!mqConfig){console.error("ERROR:configurationnotobtained");process.exit(99);}//redis配置constredisConfig=config.redis[mqConfig.redis];if(!redisConfig){console.error("ERROR:redisconfigurationnotgetting");过程。exit(99);}//nodeindex.js--nameQUEUE_MY_MQbootstrap(mqConfig,redisConfig);lib/core.js背后的核心逻辑写在这里asyncfunctionbootstrap(config){console.log(config);}module.exports={bootstrap,};核心逻辑li??b/core.jsconst{redisCreateClient}=require("../utils/redis");asyncfunctionbootstrap(mqConfig,redisConfig){try{//创建redis连接constclient=awaitredisCreateClient(redisConfig);//通过无限循环阻止程序while(true){letres=null;console.log("队列执行");try{//从队列中获取任务,阻塞模式下获取任务的最大阻塞时间为config.queue.timeoutres=awaitclient.brPop(mqConfig.name,mqConfig.brPopTimeout);如果(res===null){继续;}console.log("TODO::任务处理",res);}catch(error){console.log("ERROR:redisbrPoperror",error);继续;}}}catch(err){//异常处理程序console.log("ERROR:",err);进程.exit(1);}}module.exports={bootstrap,};生成测试数据为接下来的测试先生成一些测试数据test/mockMq.jsconst{redisCreateClient}=require("../utils/redis");constconfig=require("../config");/**生成1000条测试消息*/constmockMq=async(key)=>{constclient=awaitredisCreateClient(config.redis.default);for(leti=0;i<1000;i++){//推送消息到队列awaitclient.lPush(key,"test"+i);}//获取队列长度constcount=awaitclient.lLen(key);console.log(`1000条测试消息生成完成,当前有${count}条消息`);//关闭redis连接client.quit();};mockMq("QUEUE_MY_MQ");验证脚本有效性#执行消息生成命令节点./test/mockMq.js#程序输出#redis连接成功#生成1000条测试消息,当前有1000条消息#执行并启动消费者节点./lib/index.js--nameQUEUE_MY_MQ#TODO::任务处理{key:'QUEUE_MY_MQ',element:'test0'}#TODO::任务处理.......#TODO::任务处理{key:'QUEUE_MY_MQ',element:'test999'}DefineJobPostscript至此,建议队列已经实现。当然后面还有一些优化,比如通过配置文件动态引入job,如何使用pm2启动消费队列等。可配置启动次数和启停控制。(ps:这里的坑很快就会被填上)当然除了这些,这个简单的队列还有很多不足之处。比如任务执行失败如何处理,消费后如何ack,没有使用成熟的主题协议,没有实现延迟队列。这些坑可能因为个人水平和redis本身的特性,很长时间都填不上。推荐使用成熟的套件如KafkaRabbitMq等更适合当前语言的套件。
