queue,它是数据结构中的线性表,其特点是数据必须从一端插入,然后从另一端删除。但是笔者今天的重点不是如何实现数据结构,我们可以看看如何借助队列来管理复杂的任务。队列在实际开发中的使用场景非常广泛。因为在一个复杂的系统中,总会有一些非常耗时的处理。这时,开发者不能要求系统提供实时处理和实时响应的能力。这时候我们可以通过队列来解决这类问题。开发者可以不断向队列中插入数据并为其生成唯一值(用于跟踪),同时根据当前系统的处理能力不断从队列中取出数据进行业务处理,从而减少需要同时处理大量复杂的业务处理,以增强系统的处理能力。服务端通常可以使用队列进行异步处理、系统解耦、数据同步、流量调峰等。如果需求比较简单,开发者可以直接使用数组进行处理。对于更复杂的需求,可以使用better-queue来解决。better-queue进一步扩展了队列,使其具有许多有用的功能。如:并行处理持久化(可扩展)存储批处理优先级队列合并过滤任务任务统计用法下面开始看看better-queue是如何使用的。代码风格importBetterQueuefrom"better-queue";//创建一个队列,并提供任务处理的回调函数//当回调被调用时,表示数据已经从队列中删除//然后取下一块从队列中获取数据并继续处理constq=newBetterQueue(function(input,cb){//从队列中获取数据并处理...constresult='xxxx'try{//如果成功,调用回调并返回结果cb(null,result);}catch(err){//否则返回错误cb(err)}})q.push(1)q.push({x:1})我们可以看到该库的代码风格仍然采用早期版本的Node风格的回调,如果在执行过程中出现错误,会将错误作为回调的第一个参数传递给回调函数。类似于:fs.readFile(filePath,(err,data)=>{if(err){console.log(err)return}console.log(data)})队列生成和使用首先我们可以构建存储结构和请求数据结构作业。//任务数据接口Job{//任务的唯一值,唯一确定当前任务id:string;//当前任务的状态:waiting,successful,failedstatus:'waiting'|'成功'|'失败的';//任务的请求参数,可以是id或者其他数据queryArgs?:any;//任务结果的返回结果:T;//任务报错信息err:Error;}然后开发队列的回调Function和新的任务queue://异步处理逻辑asyncfunctionasyncProcess(job:Job,cb:Function){constreq=job.queryArgs||job.idtry{//await异步请求处理,数据库访问或文件生成等耗时任务constresult=awaitquery('/xxx/xxx',req)cb(null,result)}catch(error){//生成错误cb(error)}}//创建队列constbetterQueue=newBetterQueue(asyncProcess)//对象存储,因为队列只进行任务处理,不包括数据存储//也可以使用mapconstjobById={}//Createqueuedatafor(leti=0;i<10000;i++){//创建一个作业constasyncJob:Job={id:`${id}`,queryArgs:{},status:'waiting'}//通过idjobById[asyncJob.id]=asyncJobbetterQueue.push(asyncJob)//取出数据完成请求后,调用cb(null,result)进入这里。on('finish',(result)=>{//修改任务状态并存储任何Jobresultjob.status='succeeded'job.result=result})//失败的调用cb(err)会进入这里。on('failed',(error:Error)=>{//修改任务状态并存储错误信息job.status='failed'job.err=error})}//获取任务,如果队列是未处理,则返回等待状态//如果队列已处理,则返回成功或失败functiongetJob(id:string){returnjobById[id]}任务存储后,我们可以获取整个任务信息在前端或者服务端根据id并发处理。这时候任务队列就会一项一项的进行业务处理。在上一个异步任务完成后(成功或失败))继续执行下一个任务。但这太慢了。同时,系统本应具备的处理能力也没有发挥出来。这个时候我们可以直接添加配置项concurrent。//创建一个队列constbetterQueue=newBetterQueue(asyncProcess,{concurrent:10})这样系统就可以顺序地同时处理多个任务。显着减少所有任务的处理时间。任务状态我们还可以通过getStats()获取当前任务状态,也就是getStats返回的信息:interfaceQueueStats{total:number;//处理的任务总数average:number;//平均处理时间successRate:number;//成功率,在0和1之间peak:number;//大多数任务在任何给定时间点排队}functioncb(){//获取当前队列的状态并打印完成数据比较。//例如:1/102/10conststats=betterQueue.getStats()console.log(`${stats.total}/10000`)}betterQueue.push(asyncJob).on('finish',(result)=>{//...//完成时回调cb()}).on('failed',(error:Error)=>{//...//完成时回调cb()})At这个时候,我们可以使用getStats来向前端显示当前的任务状态。队列控制better-queue提供了强大的队列控制能力。我们可以通过任务id直接取消一个任务。//直接取消任务betterQueue.cancel(jobId)我们也可以将cancelIfRunning设置为true来控制取消之前队列中之前的任务。//创建队列constbetterQueue=newBetterQueue(asyncProcess,{cancelIfRunning:true})betterQueue.push({id:'xxx'});//如果之前的id在队列中,则取消之前的任务,执行下一个任务betterQueue.push({id:'xxx'});我们也可以很方便地控制队列的暂停、恢复和销毁。//暂停队列运行betterQueue.pause()//恢复队列运行betterQueue.resume()//销毁队列并清理数据betterQueue.destroy()同时,开发者也可以发送一个对象来自新创建的队列行控件的回调函数。例如:constbetterQueue=newBetterQueue(function(file:File,cb:Function){varworker=someLongProcess(file);return{cancel:function(){//取消文件上传},pause:function(){//暂停文件处理},resume:function(){//恢复文件上传}}})betterQueue.push('/path/to/file.pdf')betterQueue.pause()betterQueue.resume()超时重试对于异步任务,如果执行失败,better-queue也提供了重试机制。constbetterQueue=newBetterQueue(asyncProcess,{//如果当前任务失败,可以重新请求,最多10次。如果任务失败超过10次,maxRetries:10,//重试等待时间1sretryDelay:1000,//超时时间5s,如果当前异步任务处理超过5s,则认为任务失败maxTimeout:5000,})当前任务队列被持久化存储在内存中,但是在开发服务端时,可能不会为了安全只放在内存中,我们可以传入store配置Item来持久化队列数据。//此时队列的插入和删除会和数据库进行交互constbetterQueue=newBetterQueue(asyncProcess,{store:{type:'sql',dialect:'sqlite',path:'/path/to/sqlite/file'}})//或者使用usebetterQueue.use({type:'sql',dialect:'sqlite',path:'/path/to/sqlite/file'})这个库目前支持SQLite和PostgreSQL,该项目还提供定制支持。betterQueue.use({connect:function(cb){//连接到你的数据库},getTask:function(taskId,cb){//查询任务},putTask:function(taskId,task,priority,cb){//优先保存任务},takeFirstN:function(n,cb){//删除前n项(根据优先级和传入顺序排序)},takeLastN:function(n,cb){//删除后n项(按照优先级和传入顺序排序)}})先进后出better-queue不仅提供了先进先出的逻辑,即使是先进先出的逻辑,只需要在队列中加入filo即可配置。//创建队列constbetterQueue=newBetterQueue(asyncProcess,{filo:true})任务过滤、合并和调整优先级我们可以在业务处理中过滤某些任务,只需要添加过滤功能即可。constbetterQueue=newBetterQueue(asyncProcess,{//推送任务前执行过滤filter:asyncfunction(job:Job,cb:Function){//执行业务处理前的预处理,校验数据,查数据库等比较有用//异步处理验证失败if(filterFail){cb('not_allowed')return}//预处理cb(null,job)}})对于相同id的任务,better-queue提供了合并功能:constbetterQueue=newBetterQueue(function(task,cb){console.log("我有%d%ss.",task.count,task.id);cb();},{merge:function(oldTask,newTask,cb){oldTask.count+=newTask.count;cb(null,oldTask);}})betterQueue.push({id:'apple',count:2})betterQueue.push({id:'apple',count:1})betterQueue.push({id:'orange',count:1})betterQueue.push({id:'orange',count:1})//这会打印出来//我有3个苹果。//我有2个橙子。//而不是//我有1个苹果。//我有1个橙子。优先级也是队列非常重要的配置。constbetterQueue=newBetterQueue(asyncProcess,{//决定优先处理哪些任务:function(job:Job,cb:Function){if(job.queryArgs==='xxxxx'){cb(null,10)return}if(job.queryArgs==='xxx'){cb(null,5)return}cb(null,1);}})批处理和预批处理也可以增强系统处理能力。使用批处理并不是立即处理任务,而是将多个任务组合成一个任务进行处理。批处理不同于并发。在这个配置中,当前队列中存储的数据在到达批处理配置之前不会被处理。constbetterQueue=newBetterQueue<(function(batch,cb){//batch是一个数组,最多3个//[job1,job2,job3]cb()},{//batchsizebatchSize:3,//有5秒内等待队列有3个,或者3秒内没有新任务加入//直接处理队列batchDelay:5000,batchDelayTimeout:3000})//目前也会触发,但是不会有新任务等待3秒后加入Task//如果一开始放入一条数据,等待2.5秒后放入第二条数据,则betterQueue.push(job1)betterQueue.push(job2)//1s内推送第一条数据,5s后也会执行三条数据入队列//队列数据达到3条,开始处理betterQueue.push(job3)我们也可以判断是否执行下一批添加先决条件。constbetterQueue=newBetterQueue<(function(batch,cb){//batch是一个数组,最多3个//[job1,job2,job3]cb()},{precondition:function(cb){//是当前网络状态isOnline(function(err,ok){if(ok){//为下一次批处理返回truecb(null,true);}else{//继续直到为真cb(null,false);}})},//每10秒执行一次precondition函数preconditionRetryTimeout:10*1000})当然,better-queue提供了更多的参数和配置,我们可以进一步学习,以便在现有业务任务的基础上管理复杂性。使负责任的任务更易于管理。同时也可以提高系统处理业务的能力。鼓励如果您觉得这篇文章不错,希望您能给我一些鼓励,帮我在我的github博客下star。博客地址参考better-queue