当前位置: 首页 > 科技观察

Redis+Node.js实现了一个可以处理海量数据的异步任务队列系统_0

时间:2023-03-14 19:31:18 科技观察

在最近的一次业务中,接到了一个处理大约10万条数据的请求。这些数据以字符串的形式给出,处理它们的步骤异步且耗时(平均处理一条数据需要25s)。如果以串行方式实现,耗时相当长:总耗时=数据量×单条数据处理时间T=N*t(N=100000;t=25s)总耗时=2500000秒≈695小时≈29天显然,我们不能简单地将数据一一处理。那么有什么办法可以减少处理时间呢?经过研究,发现使用异步任务队列是一个很好的方式。1、异步任务队列的原理我们可以把“处理单条数据”理解为一个异步任务,那么处理这10万条数据就可以转化为10万个等待执行的异步任务。我们可以把这10万条数据放到一个队列中,让任务处理器从队列中自发的获取并完成。可以有多个任务处理器,它们从队列中取出任务,同时进行处理。当任务队列为空时,表示所有任务都已领取;当所有的任务处理者都完成了任务,就意味着所有的任务都处理完了。基本原理如下图所示:首先,解决任务队列问题。在这个需求中,任务队列中的每个任务都包含需要处理的数据,数据以字符串的形式存在。为了方便,我们可以使用Redis的List数据格式来存储这些任务。由于项目是基于NodeJS的,我们可以使用PM2的Cluster模式,启动多个任务处理器,并行处理任务。以8核CPU为例,如果完全启用多进程,其理论处理时间将增加8倍,从29天缩短至3.6天。接下来,我们将从实际编码的角度来讲解以上内容的实现过程。2、使用NodeJS操作Redis异步任务队列是使用Redis实现的,所以我们需要部署一个单独的Redis服务。为了在本地开发中快速完成Redis的安装,我采用了Docker的方式(默认机器已经安装了Docker)。Docker拉取Redis镜像dockerpullredis:latestDocker启动Redisdockerrun-itd--nameredis-local-p6379:6379redis此时我们已经用Docker启动了一个Redis服务,它的对外IP和端口为127.0.0.1:6379。此外,我们还可以在本地安装一个名为AnotherRedisDeskTopManager的Redis可视化工具,实时查看和修改Redis内容。在NodeJS中,我们可以使用node-redis来操作Redis。新建一个mqclient.ts文件,写入如下内容:import*asRedisfrom'redis'constclient=Redis.createClient({host:'127.0.0.1',port:6379})exportdefaultclientRedis本质上是一个数据库,而我们对数据库的操作无非就是增删改查。node-redis支持Redis的所有交互操作,但默认以回调函数的形式返回操作结果。为了能够使用async/await,我们可以新建一个utils.ts文件,将node-redis对Redis的各种操作封装成Promise的形式,方便我们后续的使用。importclientfrom'./mqClient'//获取Redis中某个key的内容exportconstgetRedisValue=(key:string):Promise=>newPromise(resolve=>client.get(key,(err,reply)=>resolve(reply)))//设置Redis中某个key的内容exportconstsetRedisValue=(key:string,value:string)=>newPromise(resolve=>client.set(key,value,resolve))//删除RedisAkey及其内容exportconstdelRedisKey=(key:string)=>newPromise(resolve=>client.del(key,resolve))另外可以将其他常用的工具方法放在utils.ts中,实现代码复用,保证代码整洁.为了在Redis中创建任务队列,我们??可以单独编写一个createTasks.ts脚本,将自定义任务插入到队列中。import{TASK_NAME,TASK_AMOUNT,setRedisValue,delRedisKey}from'./utils'importclientfrom'./mqClient'client.on('ready',async()=>{awaitdelRedisKey(TASK_NAME)for(leti=TASK_AMOUNT;i>0;i--){client.lpush(TASK_NAME,`task-${i}`)}client.lrange(TASK_NAME,0,TASK_AMOUNT,async(err,reply)=>{if(err){console.error(err)return}console.log(reply)process.exit()})})在这个脚本中,我们从utils.ts中获取到每个Redis操作的方法,以及任务名称TASKNAME(这里是localtasks)和Total任务数量TASKAMOUNT(此处为20)。使用LPUSH方法将task-1到task-20的任务插入到TASKNAME的List中,如图:3.异??步任务处理首先新建一个index.ts文件作为整个异步任务队列的入口文件处理系统。importtaskHandlerfrom'./tasksHandler'importclientfrom'./mqClient'client.on('connect',()=>{console.log('Redisisconnected!')})client.on('ready',async()=>{console.log('Redisisready!')awaittaskHandler()})client.on('error',(e)=>{console.log('Rediserror!'+e)})会在运行文件Redis时自动连接,并在就绪状态下执行任务处理器taskHandler()。在上一节的操作中,我们向任务队列中添加了20个任务,每个任务都是task-n形式的字符串。为了验证异步任务的实现,我们可以在任务处理器taskHandler.ts中写一个demo函数来模拟一个真正的异步任务:functionhandleTask(task:string){returnnewPromise((resolve)=>{setTimeout(async()=>{console.log(`Handlingtask:${task}...`)resolve()},2000)})}上面的handleTask()函数会在执行2秒后打印出当前任务的内容。并返回一个Promise,很好地模拟了异步函数是如何实现的。接下来我们将围绕这个函数来处理队列中的任务。其实至此,整个异步任务队列处理系统已经基本完成,只需要在taskHandler.ts中添加一点代码:import{popTask}from'./utils'importclientfrom'./mqClient'functionhandleTask(task:string){/*...*/}exportdefaultasyncfunctiontasksHandler(){//从队列中获取一个任务consttask=awaitpopTask()//处理任务awaithandleTask(task)//递归运行awaittasksHandler()}最后我们使用PM2启动4进程并尝试运行整个项目:pm2start./dist/index.js-i4&&pm2logs可以看到4个任务处理器分别处理了队列中的所有任务,互不影响。现在搞定了吗?不必要。为了测试我们的系统效率提高了多少,我们还需要统计完成队列中所有任务所花费的总时间。4、统计任务完成时间统计任务完成耗时,只需要执行下面的公式即可:总耗时=上一个任务完成时间-第一个任务最先得到解决的时间”get第一个任务“获得”的问题。由于我们是通过PM2的Cluster模式启动应用,而从Redis队列中读取任务是一个异步操作,在多进程运行时无法直接保证从队列中读取任务的顺序。一个额外的标记来判断。原理如下图所示:如图所示,由于绿工不能保证操作顺序,所以数字用问号表示。获取第一个任务时,将黄色标志值从false设置为true。当且仅当黄旗为假时才设置时间。这样在获取到其他任务的时候,就不能设置时间了,因为黄色标志值已经为真了,所以我们可以获取到第一个任务获取到的时间。在本文的例子中,黄色标志值和第一个任务获得的时间也存储在Redis中,分别命名为localtasksSETFIRST和localtasksBEGINTIME。原理已经明白了,但是在实践中还有一个值得注意的地方。我们知道从Redis读写数据也是一个异步操作。由于我们有多个worker而只有一个Redis,所以读取黄色标记的值时很有可能会出现“冲突”的问题。比如worker-1在修改tag值为true时,worker-2正在读取tag值。由于时间关系,有可能worker-2读到的tag值还是false,这样就冲突了。为了解决这个问题,我们可以使用node-redlock工具来实现“加锁”操作。顾名思义,“锁定”操作可以理解为当worker-1读取和修改tag值时,不允许其他worker读取该值,即锁定tag值。当worker-1修改完tag值后,会释放锁,其他worker才可以读取tag值。node-redlock是Redis分布式锁Redlock算法的JavaScript实现。算法解释请参考https://redis.io/topics/distlock。值得注意的是,在使用node-redlock的过程中,如果要锁定一个已经存在的key,必须给key加上一个前缀locks:,否则会报错。回到utils.ts,写一个setBeginTime()工具函数:exportconstsetBeginTime=async(redlock:Redlock)=>{//读取标签值前锁定constlock=awaitredlock.lock(`lock:${TASK_NAME}_SET_FIRST`,1000)constsetFirst=awaitgetRedisValue(`${TASK_NAME}_SET_FIRST`)//当且仅当tag值不等于true时,设置开始时间if(setFirst!=='true'){console.log(`${pm2tips}Getthefirsttask!`)awaitsetRedisValue(`${TASK_NAME}_SET_FIRST`,'true')awaitsetRedisValue(`${TASK_NAME}_BEGIN_TIME`,`${newDate().getTime()}`)}//完成后tag值的读写操作,释放锁awaitlock.unlock().catch(e=>e)}加入到taskHandler()函数中:exportdefaultasyncfunctiontasksHandler(){+//获取第一个任务获取的时间+awaitsetBeginTime(redlock)//从队列中取出一个任务consttask=awaitpopTask()//处理任务awaithandleTask(task)//递归运行awaittasksHandler()}接下来解决“上一个任务完成时间”的问题。和前面的问题类似,由于任务执行的顺序无法保证,异步操作的完成时间也无法保证,所以我们还需要一个额外的标识来记录任务的完成情况。在Redis中创建一个初始值为0的标识符localtasksCURINDEX,当worker完成一个任务时让标识符递增。由于任务队列的初始长度是已知的(它是一个TASKAMOUNT常量,也写在Redis的localtasksTOTAL中),当flag的值等于队列初始长度的值时,可以表示所有任务都有已完成。如图所示,完成的任务会在黄色标记上加一。每当判断标记的值等于队列的初始长度值时,就可以表明任务已经完成。回到taskHandler()函数,添加如下内容:exportdefaultasyncfunctiontasksHandler(){+//获取标识值和队列初始长度+letcurIndex=Number(awaitgetRedisValue(`${TASK_NAME}_CUR_INDEX`))+consttaskAmount=Number(awaitgetRedisValue(`${TASK_NAME}_TOTAL`))+//等待新任务+if(taskAmount===0){+console.log(`${pm2tips}Watingnewtasks...`)+awaitsleep(2000)+awaittasksHandler()+return+}+//判断所有任务已经完成+if(curIndex===taskAmount){+constbeginTime=awaitgetRedisValue(`${TASK_NAME}_BEGIN_TIME`)+//获取总耗时+constcost=newDate().getTime()-Number(beginTime)+console.log(`${pm2tips}Alltaskswerecompleted!Timecost:${cost}ms.${beginTime}`)+//初始化一些标识值Redis+awaitsetRedisValue(`${TASK_NAME}_TOTAL`,'0')+awaitsetRedisValue(`${TASK_NAME}_CUR_INDEX`,'0')+awaitsetRedisValue(`${TASK_NAME}_SET_FIRST`,'false')+awaitdelRedisKey(`${TASK_NAME}_BEGIN_TIME`)+awaitsleep(2000)+awaittasksHandler()}//获取第一个任务获取的时间awaitsetBeginTime(redlock)//从队列中取出一个任务consttask=awaitpopTask()//处理任务aawithandleTask(task)+//任务完成后,需要标记位加一+try{+constlock=awaitredlock.lock(`lock:${TASK_NAME}_CUR_INDEX`,1000)+curIndex=awaitgetCurIndex()+awaitsetCurIndex(curIndex+1)+awaitlock.unlock().catch((e)=>e)+}catch(e){+console.log(e)+}+//recursion+awaittasksHandler()+}//递归运行awaittasksHandler()}至此,我们就解决了获取“上一个任务完成时间”的问题,结合获得第一个任务的时间,就可以得到总的运行时间。最后我们来看一下实际运行效果。我们例行将task-1到task-20的20个任务加入队列,然后启动4个进程运行:运行情况良好。从运行结果来看,4个进程只需要10秒就可以处理20个平均需要2秒的任务,完全符合假设。5.总结当面对大量需要处理的异步任务时,多进程+任务队列的方式是一个很好的解决方案。本文通过探索Redis+NodeJS的组合构建了一个异步任务队列处理系统,可以更好的完成原先的计划,但是还有很多问题需要改进。比如任务出错应该怎么办,系统能否支持不同类型的任务,能不能跑多个队列等等,都是值得思考的问题。