一、前言在开发过程中,有时会遇到需要控制任务并发执行数的需求。例如,爬虫程序可以通过限制其并发任务数来降低请求频率,从而避免因请求过于频繁而被ban的问题。接下来,本文介绍如何实现并发控制器。2.例子consttask=timeout=>newPromise((resolve)=>setTimeout(()=>{resolve(timeout);},timeout))consttaskList=[1000,3000,200,1300,800,2000];asyncfunctionstartNoConcurrentControl(){console.time(NO_CONCURRENT_CONTROL_LOG);awaitPromise.all(taskList.map(item=>task(item)));console.timeEnd(NO_CONCURRENT_CONTROL_LOG);}startNoConcurrentControl();以上示例代码使用Promise.all方法模拟6在并发执行任务的场景下,执行所有任务的总时间为3000毫秒。下面将通过这个例子来验证实现方法的正确性。3.实现由于并发执行的任务数量是有限的,因此需要一个数据结构来管理不断产生的任务。队列的**“先进先出”特性可以保证任务并发执行的顺序。在JavaScript中,你可以使用“数组来模拟队列”**:_queue.shift();}isEmpty(){returnthis._queue.length===0;}}对于每一个任务,你需要管理它的执行函数和参数:classDelayedTask{constructor(resolve,fn,args){this.resolve=resolve;this.fn=fn;this.args=args;}}接下来实现核心的TaskPool类,主要用于控制任务执行:classTaskPool{constructor(size){this.size=size;this.queue=newQueue();}addTask(fn,args){returnnewPromise((resolve)=>{this.queue.push(newDelayedTask(resolve,fn,args));if(this.size){this.size--;const{resolve:taskResole,fn,args}=this.queue.shift();taskResole(this.runTask(fn,args));}})}pullTask??(){if(this.queue.isEmpty()){return;}if(this.size===0){return;}this.size++;const{resolve,fn,args}=this。queue.shift();resolve(this.runTask(fn,args));}runTask(fn,args){constresult=Promise.resolve(fn(...args));result.then(()=>{this.size--;this.pullTask??();}).catch(()=>{this.size--;this.pullTask??();})returnresult;}}TaskPool包含三个关键方法:addTask:将新任务放入队列并触发任务池状态检测。如果当前任务池未加载满,则从队列中取出任务,放入任务池中执行runTask:执行当前任务。任务执行后,更新任务池的状态。这个时候就会触发主动拉取新任务的机制。pullTask??:如果当前队列不为空,任务池未满,会主动取出队列中的任务执行。接下来将上例的并发控制为2:constcc=newConcurrentControl(2);asyncfunctionstartConcurrentControl(){console.time(CONCURRENT_CONTROL_LOG);awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))console.timeEnd(CONCURRENT_CONTROL_LOG);}startConcurrentControl();执行过程如下:执行任务总耗时5000毫秒。4.高阶函数优化传参awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))每个task的参数都需要手动传,很麻烦,这里可以传**《高阶函数实现参数自动透传》**:addTask(fn){return(...args)=>{returnnewPromise((resolve)=>{this.queue.push(newDelayedTask(resolve,fn,args));if(this.size){this.size--;const{resolve:taskResole,fn:taskFn,args:taskArgs}=this.queue.shift();taskResole(this.runTask(taskFn,taskArgs));}})}}修改后的代码就简单多了:awaitPromise.all(taskList.map(cc.addTask(task)))5.优化dequeue操作数组一般都是基于一块**“连续内存”**存储,调用数组的shift方法时,先删除头部元素(时间复杂度O(1)),然后需要将未删除的元素左移一位(时间复杂度O(n)),所以移位操作的时间复杂度为O(n)。由于JavaScript语言的特性,V8在实现JSArray时提供了空间和时间权衡的方案。在不同的场景下,JSArray会在FixedArray和HashTable模式之间切换。在hashTable模式下,移位操作节省了左移的时间复杂度,其时间复杂度可以降低到O(1)。即便如此,shift仍然是一个耗时的操作。在数组元素较多,需要频繁移位操作的场景下,可以通过“reverse+pop”的方式进行优化。constBenchmark=require('benchmark');conssuite=newBenchmark.Suite;suite.add('shift',function(){letcount=10;constarr=generateArray(count);while(count--){arr.shift();}}).add('reverse+pop',function(){letcount=10;constarr=generateArray(count);arr.reverse();while(count--){arr.pop();}}).on('cycle',function(event){console.log(String(event.target));}).on('complete',function(){console.log('Fastestis'+this.filter('fastest').map('name'));console.log('\n')}).run({async:true})通过benchmark.js运行的benchmark数据,很容易看出哪个是第一个方法更高效:回顾之前Queue类的实现,由于只有一个数组存放任务,直接使用reverse+pop方法势必会影响任务的执行顺序。这里有必要介绍一下双数组的设计,一个数组负责入队操作,一个数组负责出队操作。classHighPerformanceQueue{constructor(){this.q1=[];//推送数据this.q2=[];//移位数据}push(value){returnthis.q1.push(value);}shift(){letq2=this.q2;if(q2.length===0){constq1=this.q1;if(q1.length===0){return;}this.q1=q2;//感谢@shaonialife同学指正q2=this.q2=q1.reverse();}returnq2.pop();}isEmpty(){if(this.q1.length===0&&this.q2.length===0){returntrue;}返回假;}}最后通过benchmarking验证优化效果:
