1.并发控制简介在日常的开发过程中,可能会遇到并发控制的场景,比如控制并发请求数。那么如何在JavaScript中实现并发控制呢?在回答这个问题之前,我们先简单介绍一下并发控制。假设有6个待执行任务,我们想限制同时执行的任务数,即最多同时执行2个任务。当“正在执行的任务列表”中的任意一项任务完成后,程序会自动从“待办任务列表”中获取一个新的待办任务,并将该任务添加到“正在执行的任务列表”中。为了让大家更直观的理解上面的过程,阿宝哥特地画了下面三张图:1.1Stage1,1.2Stage2,1.3StageThree好了,引入并发控制后,阿宝哥在Github上使用async-pool这个库介绍异步任务并发控制的具体实现。https://github.com/rxaviers/async-poolRunmultiplepromise-returning&asyncfunctionswithlimitedconcurrencyusingnativeES6/ES7。2.并发控制async-pool的实现这个库提供了ES7和ES6两个不同版本。在分析它的具体实现之前,我们先看看它是如何使用的。2.1asyncPool的使用consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),i));等待同步池(2,[1000,5000,3000,2000],超时);在上面的代码中,我们使用了async-pool库提供的asyncPool函数来实现异步任务的并发控制。asyncPool函数的签名如下:functionasyncPool(poolLimit,array,iteratorFn){...}该函数接收3个参数:poolLimit(number类型):表示限制并发数;array(数组类型):表示任务数组;iteratorFn(函数类型):表示一个迭代器函数,用于处理每一个任务项,这个函数会返回一个Promise对象或者一个异步函数。对于上面的例子,使用asyncPool函数后,对应的执行过程如下:consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),i));awaitasyncPool(2,[1000,5000,3000,2000],timeout);//Calliterator(i=1000)//Calliterator(i=5000)//Poollimitof2reached,waitforthequickeronetocomplete...//1000finishes//Calliterator(i=3000)//达到2的池限制,等待更快的完成...//3000finishes//Calliterator(i=2000)//Itaration完成,等待运行完成...//5000finishes//2000finishes//解析,结果按数组顺序传递`[1000,5000,3000].2有了上面的注释信息,我们就可以大致了解asyncPool函数内部的控制流程了。我们先来分析下asyncPool函数的ES7实现。2.2asyncPoolES7实现asyncfunctionasyncPool(poolLimit,array,iteratorFn){constret=[];//存储所有异步任务constexecuting=[];//存储正在执行的异步任务for(constitemofarray){//调用iteratorFn函数创建异步任务constp=Promise.resolve().then(()=>iteratorFn(item,array));ret.push(p);//保存新的异步任务//当poolLimit值小于等于任务总数时,进行并发控制if(poolLimit<=array.length){//当任务为completed,从执行任务数组中取出完成的任务conste=p.then(()=>executing.splice(executing.indexOf(e),1));executing.push(e);//保存异步任务beingexecutedif(executing.length>=poolLimit){awaitPromise.race(executing);//等待更快的任务执行完成}}}returnPromise.all(ret);}以上代码中,Promise的特性。充分利用了all和Promise.race的功能,结合ES7提供的asyncawait特性,最终实现了并发控制功能。使用awaitPromise.race(executing);line语句,我们将等待执行任务列表中较快的任务完成,然后再继续执行下一个循环。asyncPoolES7的实现比较简单。接下来我们看看如何在不使用asyncawait特性的情况下实现同样的功能。2.3asyncPoolES6实现functionasyncPool(poolLimit,array,iteratorFn){leti=0;constret=[];//存储所有异步任务constexecuting=[];//存储正在执行的异步任务constenqueue=function(){if(i===array.length){returnPromise.resolve();}constitem=array[i++];//获取一个新的任务项constp=Promise.resolve().then(()=>iteratorFn(item,array));ret.push(p);letr=Promise.resolve();//当poolLimit值小于等于任务总数时,进行并发控制if(poolLimit<=array.length){//当任务完成,从执行任务数组中移除完成的任务conste=p.then(()=>executing.splice(executing.indexOf(e),1));executing.push(e);if(executing.length>=poolLimit){r=Promise.race(executing);}}//执行任务列表中较快的任务执行完毕后,会从array数组中获取新的待办任务returnr.then(()=>enqueue());};returnenqueue().then(()=>Promise.all(ret));}在ES6的实现版本中,核心控制逻辑是通过内部封装的enqueue函数实现的。当Promise.race(executing)返回的Promise对象完成时,会调用enqueue函数从array数组中获取新的待办任务。3.宝哥说了asyncPool库在ES7和ES6的具体实现中,我们都用到了Promise.all和Promise.race函数。其中手写Promise.all是一道常见的面试题。正好趁着这个机会,阿宝和大家手写简单版的Promise.all和Promise.race函数。3.1手写Promise.allPromise.all(iterable)方法会返回一个promise对象。当所有输入的promise对象状态变为resolved时,返回的promise对象会以数组的形式返回每个promise对象的resolved值。结果。当任何输入的承诺对象的状态变为拒绝时,返回的承诺对象将拒绝相应的错误消息。Promise.all=function(iterators){returnnewPromise((resolve,reject)=>{if(!iterators||iterators.length===0){resolve([]);}else{letcount=0;//计数器,用来判断是否所有任务都执行完了letresult=[];//结果数组for(leti=0;i
