Redis+NodeJS实现了一个异步任务队列系统,可以处理海量数据数据需求。这些数据以字符串的形式给出,处理它们的步骤异步且耗时(平均处理一条数据需要25s)。如果以串行方式实现,耗时相当长:总耗时=数据量×单条数据处理时间T=N*t(N=100000;t=25s)总耗时=2500000秒≈695小时≈29天显然,我们不能简单地将数据一一处理。那么有什么办法可以减少处理时间呢?经过研究,发现使用异步任务队列是一个很好的方式。1、异步任务队列的原理我们可以把“处理单条数据”理解为一个异步任务,那么处理这10万条数据就可以转化为10万个等待执行的异步任务。我们可以把这10万条数据放到一个队列中,让任务处理器从队列中自发的获取并完成。可以有多个任务处理器,它们从队列中取出任务,同时进行处理。当任务队列为空时,表示所有任务都已领取;当所有的任务处理者都完成了任务,就意味着所有的任务都处理完了。基本原理如下图所示:首先,解决任务队列问题。在这个需求中,任务队列中的每个任务都包含需要处理的数据,数据以字符串的形式存在。为了方便,我们可以使用Redis的List数据格式来存储这些任务。由于项目是基于NodeJS的,我们可以使用PM2的Cluster模式,启动多个任务处理器,并行处理任务。以8核CPU为例,如果完全启用多进程,其理论处理时间将增加8倍,从29天缩短至3.6天。接下来,我们将从实际编码的角度来讲解以上内容的实现过程。2、使用NodeJS操作Redis异步任务队列是使用Redis实现的,所以我们需要部署一个单独的Redis服务。为了在本地开发中快速完成Redis的安装,我采用了Docker的方式(默认机器已经安装了Docker)。Docker拉取Redis镜像dockerpullredis:latestDocker启动Redisdockerrun-itd--nameredis-local-p6379:6379redis此时我们已经使用Docker启动了一个Redis服务,其对外IP和端口为127.0.0.1:6379另外,我们还可以在本地安装一个名为AnotherRedisDeskTopManager的Redis可视化工具,实时查看和修改Redis内容。在NodeJS中,我们可以使用node-redis来操作Redis。新建一个mqclient.ts文件,写入如下内容:import*asRedisfrom'redis'constclient=Redis.createClient({host:'127.0.0.1',port:6379})exportdefaultclientRedis本质上是一个数据库,而我们对数据库的操作无非就是增删改查。node-redis支持Redis的所有交互操作,但默认以回调函数的形式返回操作结果。为了能够使用async/await,我们可以新建一个utils.ts文件,将node-redis对Redis的各种操作封装成Promise的形式,方便我们后续的使用。importclientfrom'./mqClient'//获取Redis中某个key的内容exportconstgetRedisValue=(key:string):Promise=>newPromise(resolve=>client.get(key,(err,reply)=>resolve(reply)))//设置Redis中某个key的内容exportconstsetRedisValue=(key:string,value:string)=>newPromise(resolve=>client.set(key,value,resolve))//删除Redis中的key及其内容exportconstdelRedisKey=(key:string)=>newPromise(resolve=>client.del(key,resolve))另外在utils中也可以使用。其他常用的工具和方法都放在ts中,实现代码复用,保持代码整洁。为了在Redis中创建任务队列,我们??可以单独编写一个createTasks.ts脚本,将自定义任务插入到队列中。从'./utils'导入{TASK_NAME,TASK_AMOUNT,setRedisValue,delRedisKey}i>0;i--){client.lpush(TASK_NAME,`task-${i}`)}client.lrange(TASK_NAME,0,TASK_AMOUNT,async(err,reply)=>{if(err){控制台...)和任务总数TASK_AMOUNT(此处为20)。使用LPUSH方法将task-1到task-20的任务插入到TASK_NAME的列表中,如图:3.异??步任务处理首先新建一个index.ts文件作为整个异步任务队列的入口文件处理系统。importtaskHandlerfrom'./tasksHandler'importclientfrom'./mqClient'client.on('connect',()=>{console.log('Redis已连接!')})client.on('ready',async()=>{console.log('Redis准备好了!')awaittaskHandler()})client.on('error',(e)=>{console.log('Redis错误!'+e)})运行文件时,会自动连接Redis,并在就绪状态下执行任务处理器taskHandler()。在上一节的操作中,我们向任务队列中添加了20个任务,每个任务都是task-n形式的字符串。为了验证异步任务的实现,我们可以在taskhandlertaskHandler.ts中写一个demo函数来模拟真实的异步任务:)=>{console.log(`Handlingtask:${task}...`)resolve()},2000)})}上面的handleTask()函数会打印出当前任务的内容,并返回一个Promise,它很好地模拟了异步函数的实现方式。接下来我们将围绕这个函数来处理队列中的任务。其实至此,整个异步任务队列处理系统已经基本完成,只需要在taskHandler.ts中添加一点代码即可:import{popTask}from'./utils'task:string){/*...*/}exportdefaultasyncfunctiontasksHandler(){//从队列中取出一个任务consttask=awaitpopTask()//处理任务awaithandleTask(task)//递归运行awaittasksHandler()}最后,我们使用pm2启动4个进程来尝试运行整个项目:pm2start./dist/index.js-i4&&pm2logs可以看到,4个任务处理器是分开处理的完成队列中的所有任务后,它们之间不会相互影响。现在搞定了吗?不必要。为了测试我们的系统效率提高了多少,我们还需要统计完成队列中所有任务所花费的总时间。4、统计任务完成时间统计任务完成耗时,只需要执行下面的公式即可:总耗时=上一个任务完成时间-第一个任务最先得到解决的时间”get第一个任务“已获取”的问题,由于我们是通过PM2的Cluster模式启动应用,而从Redis队列中读取任务是一个异步操作,在多进程运行时,无法直接保证从队列中读取任务的顺序。附加标记判断,原理如下图所示:如图所示,由于绿色worker不能保证操作顺序,所以数字用问号表示。flag值从false到true,当且仅当yellowflag为false时才设置时间,这样在获取其他任务时,不能设置时间,因为yellowflag值已经为true,所以我们可以得到获得第一个任务的时间。在本文的例子中,黄色标志值和第一个任务获得的时间也存储在Redis中,分别命名为local_tasks_SET_FIRST和local_tasks_BEGIN_TIME。原理已经明白了,但是在实践中还有一个值得注意的地方。我们知道从Redis读写数据也是一个异步操作。由于我们有多个worker而只有一个Redis,所以读取黄色标记的值时很有可能会出现“冲突”的问题。比如worker-1在修改tag值为true时,worker-2正在读取tag值。由于时间关系,有可能worker-2读到的tag值还是false,这样就冲突了。为了解决这个问题,我们可以使用node-redlock工具来实现“加锁”操作。顾名思义,“锁定”操作可以理解为当worker-1读取和修改tag值时,不允许其他worker读取该值,即锁定tag值。当worker-1修改完tag值后,会释放锁,其他worker才可以读取tag值。node-redlock是Redis分布式锁Redlock算法的JavaScript实现。算法解释请参考https://redis.io/topics/distlock。值得注意的是,在使用node-redlock的过程中,如果要锁定一个已经存在的key,必须给key加上一个前缀locks:,否则会报错。回到utils.ts并编写一个setBeginTime()实用函数:exportconstsetBeginTime=async(redlock:Redlock)=>{//在读取标签值之前锁定它constlock=awaitredlock.lock(`lock:${TASK_NAME}_SET_FIRST`,1000)constsetFirst=awaitgetRedisValue(`${TASK_NAME}_SET_FIRST`)//当且仅当标签值不等于true时设置开始时间if(setFirst!=='true'){console.log(`${pm2tips}获取第一个任务!`)awaitsetRedisValue(`${TASK_NAME}_SET_FIRST`,'true')awaitsetRedisValue(`${TASK_NAME}_BEGIN_TIME`,`${newDate().getTime()}`)}//完成tag值的读写操作后,释放锁awaitlock.unlock().catch(e=>e)}加入到taskHandler()函数中:exportdefaultasyncfunctiontasksHandler(){+//获取第一个任务获取的时间+awaitsetBeginTime(redlock)//从队列中取出一个任务consttask=awaitpopTask()//处理任务awaithandleTask(task)//递归运行awaittasksHandler()}接下来解决“上一个任务完成时间”的问题。和前面的问题类似,由于任务执行的顺序无法保证,异步操作的完成时间也无法保证,所以我们还需要一个额外的标识来记录任务的完成情况。在Redis中创建一个初始值为0的标识符local_tasks_CUR_INDEX,当worker完成一个任务时让标识符递增。由于任务队列的初始长度是已知的(就是TASK_AMOUNT常量,Redis的local_tasks_TOTAL中也有写),当flag的值等于队列初始长度的值时,可以表示所有任务已经完成。如图所示,完成的任务会在黄色标记上加一。每当判断标记的值等于队列的初始长度值时,就可以表明任务已经完成。回到taskHandler()函数,添加如下内容:exportdefaultasyncfunctiontasksHandler(){+//获取标识值和队列初始长度+letcurIndex=Number(awaitgetRedisValue(`${TASK_NAME}_CUR_INDEX`))+consttaskAmount=Number(awaitgetRedisValue(`${TASK_NAME}_TOTAL`))+//等待新任务+if(taskAmount===0){+console.log(`${pm2tips}等待新任务。..`)+awaitsleep(2000)+awaittasksHandler()+return+}+//判断所有任务已经完成+if(curIndex===taskAmount){+constbeginTime=awaitgetRedisValue(`${TASK_NAME}_BEGIN_TIME`)+//getTotaltimespent+constcost=newDate().getTime()-Number(beginTime)+console.log(`${pm2tips}所有任务都完成了!时间成本:${cost}ms。${beginTime}`)+//初始化Redis的一些标识值+awaitsetRedisValue(`${TASK_NAME}_TOTAL`,'0')+awaitsetRedisValue(`${TASK_NAME}_CUR_INDEX`,'0')+awaitsetRedisValue(`${TASK_NAME}_SET_FIRST`,'false')+awaitdelRedisKey(`${TASK_NAME}_BEGIN_TIME`)+awaitsleep(2000)+awaittasksHandler()}//获取第一个获取任务的时间awaitsetBeginTime(redlock)//从队列中取出一个任务consttask=awaitpopTask()//处理任务awaithandleTask(task)+//任务完成后,需要添加一个到标志+try{+constlock=awaitredlock.lock(`lock:${TASK_NAME}_CUR_INDEX`,1000)+curIndex=awaitgetCurIndex()+awaitsetCurIndex(curIndex+1)+awaitlock.unlock().catch((e)=>e)+}catch(e){+console.log(e)+}+//递归+awaittasksHandler()+}//递归运行awaittasksHandler()}到目前为止,我们有解决了获取“finally”任务完成时间的问题,结合之前获取第一个任务的时间,可以得到总的运行时间。最后来看一下实际运行效果,我们例行添加了20个任务从task-1到task-20入队,然后启动4个进程运行:运行情况良好,从运行结果来看,4个进程处理20个平均需要2秒的任务只需要10秒,这完全符合假设。5.总结当面对大量需要处理的异步任务时,多进程+任务队列的方式是一个很好的解决方案。本文通过探索Redis+NodeJS的组合构建了一个异步任务队列处理系统,可以更好的完成原先的计划,但是还有很多问题需要改进。比如任务出错应该怎么办,系统能否支持不同类型的任务,能不能跑多个队列等等,都是值得思考的问题。如果读者有更好的想法,欢迎留言与我交流!(超过)