需求:A函数需要调用第三方API获取数据,第三方API本身是异步处理方式,调用{后会返回数据和状态data:"queryresult","status":"Processingasynchronously"},所以一段时间后需要调用第三方API获取数据。为了让用户在使用函数A时不会因为第三方API正在异步处理而等待,将用户请求添加到任务队列中,返回一些数据并关闭请求。然后定时从任务队列中取出任务,调用第三方API。如果返回状态为“异步处理”,则任务会重新加入任务队列。如果返回状态为“已处理”,则返回的数据将存储在数据库中。基于以上问题,我想到了用Node.js+Redissortedset来实现任务队列。Node.js实现了自己的应用程序API来接受用户请求,合并存储在数据库中的数据和API返回的一些数据返回给用户,并将任务添加到任务队列中。使用Node.js子进程和cron周期性的从任务队列中获取任务执行。任务队列设计过程中需要考虑的几个问题并行执行多个任务任务唯一性任务成功或失败后的处理以上问题的解决方案并行执行多个任务使用Promise.all实现任务唯一性的使用Redis排序设定实现。使用时间戳作为分数可以实现将排序集合作为列表使用。添加任务时,判断任务是否已经存在。任务取出执行时,任务分值置0,每次取出分值大于0的任务。执行,避免任务重复。任务执行成功后删除任务,任务执行失败后更新任务分数为当前时间戳,以便失败的任务重新加入任务队列。示例代码在最后//remote_api.js模拟第三方API'usestrict';constapp=require('express')();app.get('/',(req,res)=>{setTimeout(()=>{letarr=[200,300];//200表示成功,300表示失败,需要重新请求res.status(200).send({'status':arr[parseInt(Math.random()*2)]});},3000);});app.listen('9001',()=>{console.log('API服务监听端口:9001');});//producer.js自带的应用API,用于接受用户请求和向任务队列添加任务'usestrict';constapp=require('express')();constredisClient=require('redis').createClient();constQUEUE_NAME='queue:example';functionaddTaskToQueue(taskName,callback){//首先判断任务是否已经存在,存在:跳过,不存在:加入任务队列redisClient.zscore(QUEUE_NAME,taskName,(error,task)=>{if(error){console.log(error);}else{if(task){console.log('任务已经存在,不要添加相同的任务');callback(null,task);}else{redisClient.zadd(QUEUE_NAME,newDate().getTime(),taskName,(error,result)=>{if(error){回调(error);}else{回调(空,结果);}});}}});}app.get('/',(req,res)=>{lettaskName=req.query['task-name'];addTaskToQueue(taskName,(error,result)=>{if(error){console.log(error);}else{res.status(200).send('查询...');}});});app.listen(9002,()=>{console.log('生产者服务listeningport:9002');});//consumer.js定时获取任务,执行'usestrict';constredisClient=require('redis').createClient();constrequest=require('请求');constschedule=require('node-schedule');constQUEUE_NAME='queue:expmple';常量PARALLEL_TASK_NUMBER=2;//并行执行任务数functiongetTasksFromQueue(callback){//获取多个任务redisClient.zrangebyscore([QUEUE_NAME,1,newDate().getTime(),'LIMIT',0,PARALLEL_TASK_NUMBER],(error,tasks)=>{if(error){回调(错??误);}else{//将任务分值设置为0,表示正确处理if(tasks.length>0){lettmp=[];tasks.forEach((task)=>{tmp.push(0);tmp.push(task);});redisClient.zadd([QUEUE_NAME].concat(tmp),(error,result)=>{if(error){callback(error);}else{callback(null,tasks)}});}}});}functionaddFailedTaskToQueue(taskName,callback){redisClient.zadd(QUEUE_NAME,newDate().getTime(),taskName,(error,result)=>{if(error){callback(error);}else{回调(空,结果);}});}函数removeSucceedTaskFromQueue(任务名称,回调){redisClient.zrem(QUEUE_NAME,taskName,(error,result)=>{if(error){callback(error);}else{callback(null,result);}})}functionexecTask(taskName){returnnewPromise((resolve,reject)=>{letrequestOptions={'url':'http://127.0.0.1:9001','method':'GET','timeout':5000};request(requestOptions,(error,response,body)=>{if(error){resolve('failed');console.log(error);addFailedTaskToQueue(taskName,(error)=>{if(error){console.log(error);}else{}});}else{try{body=typeofbody!=='object'?JSON.parse(body):body;}catch(error){resolve('失败');控制台日志(错误);addFailedTaskToQueue(taskName,(error,result)=>{if(error){console.log(error);}else{}});返回;}if(body.status!==200){resolve('failed');addFailedTaskToQueue(taskName,(error,result)=>{if(error){console.log(error);}else{}});}else{resolve('成功');removeSucceedTaskFromQueue(taskName,(error,result)=>{if(error){console.log(error);}else{}});}}});});}//定时,每5秒获取新任务执行letjob=schedule.scheduleJob('*/5*****',()=>{console.log('获取新任务');getTasksFromQueue((error,tasks)=>{if(error){console.log(error);}else{if(tasks.length>0){console.log(tasks);Promise.all(tasks.map(execTask)).then((results)=>{console.log(results);}).catch((error)=>{console.log(error);});}}});});
