本文转载自微信公众号《编程杂技》,作者theanarkh。转载本文请联系编程杂技公众号。1后台需求包括以下几个场景1对称解密,非对称解密2压缩,解压3大量文件的增删改查4处理大量字符串,解析协议以上场景非常耗时-消费,解密,压缩,文件操作,nodejs使用内置的线程池来支持异步。但是处理字符串和解析协议是纯粹的cpu消耗操作。而且nodejs好像对解密的支持不是很好。我用的是纯js解密库,所以无法在nodejs主线程中处理。尤其是rsa解密,非常耗时。那么是时候探索解决方案了,nodejs提供了多线程的能力。所以很自然地选择了这个选项。但这只是一个初步的想法和计划。因为nodejs虽然提供了多线程的能力,但是并没有提供应用层的线程池。所以如果我们单纯的使用多线程,一个请求一个线程,这显然是不现实的。我们必须实现自己的线程池。本文分享的内容就是这个线程池的实现。线程池的设计涉及到很多方面。对于纯CPU任务,线程数和CPU内核数必须相等才能达到最佳性能。否则线程过多引起的上下文切换会导致性能下降。对于io类型的任务,理论上线程越多越好,因为可以更早的向硬盘下达命令,硬盘会不断优化处理请求。想象一下,如果发出一个命令,硬盘处理了一个,然后发送另一个命令,处理另一个,显然效率很低。当然,线程数并不是越多越好。线程过多会导致系统负载过高,上下文切换过多也会导致性能下降。我们来看看线程池的实现。2设计思路首先根据配置创建多个线程(分为预创建和惰性创建),然后将提交任务的接口暴露给用户,调度中心负责接收任务,然后选择线程进行处理根据策略的任务。子线程一直在轮询是否有任务要处理。处理完毕,通知调度中心。我们来看看2.1的具体实现和用户通信的数据结构提交任务时,调度中心返回一个UserWork对象。用户可以使用该对象与调度中心进行通信。2.2调度中心的实现调度中心的实现大致分为以下逻辑。2.2.1初始化构造函数(options={}){this.options=options;//线程池中任务总数this.totalWork=0;//子线程队列this.workerQueue=[];//这个核心线程数。coreThreads=~~options.coreThreads||config.CORE_THREADS;//线程池最大线程数,如果不支持动态扩展,最大线程数等于核心线程数this.maxThreads=options.expansion!==false?Math.max(this.coreThreads,config.MAX_THREADS):this.coreThreads;//工作线程处理任务的方式this.sync=options.sync!==false;//处理策略当超过任务队列长度时this.discardPolicy=options.discardPolicy?options.discardPolicy:DISCARD_POLICY.NOT_DISCARD;//是否预创建子线程this.preCreate=options.preCreate===true;this.maxIdleTime=~~options.maxIdleTime||config.MAX_IDLE_TIME;this.pollIntervalTime=~~options.pollIntervalTime||config.POLL_INTERVAL_TIME;this.maxWork=~~options.maxWork||config.MAX_WORK;//是否预创建线程池this.preCreate&&this.preCreateThreads();}从初始化代码可以看出线程池大致支持Ability。核心线程数最大线程数线程数过载时的处理策略,过载阈值子线程空闲退出时间和轮询任务时间是否预创建线程池是否支持动态扩容核心线程数在任务数未达到阈值gather时为工作线程。它是处理任务的主要力量。当任务数达到阈值后,如果支持动态扩展(可配置),会创建新的线程来处理更多的任务。一旦负载变低,线程会在空闲时间达到阈值时自动退出。如果扩容线程数达到阈值,有新任务到来,则按照丢弃策略进行相关处理。2.2.2创建线程newThread(){let{sync}=this;constworker=newWorker(workerPath,{workerData:{sync,maxIdleTime:this.maxIdleTime,pollIntervalTime:this.pollIntervalTime,}});constnode={worker,//本线程处理的任务数queueLength:0,};this.workerQueue.push(node);constthreadId=worker.threadId;worker.on('exit',(status)=>{//异常退出补充线程,正常退出不添加=threadId;});});//与子线程通信worker.on('message',(result)=>{const{work,event,}=result;const{data,error,workId}=work;//通过workId获取对应的userWorkerconstuserWorker=workPool[workId];deleteworkPool[workId];//任务数减一node.queueLength--;this.totalWork--;switch(event){case'done'://通知用户,任务完成userWorker.emit('done',data);break;case'error'//通知用户任务错误if(EventEmitter.listenerCount(userWorker,'error')){userWorker.emit('error',error);}break;default:break;}});worker.on('error',(...rest)=>{console.log(...rest)});returnnode;}创建线程主要是调用nodejs提供的模块创建,然后监听子线程的退出以及消息和错误事件。如果异常退出,则添加线程。调度中心维护一个子线程队列。记录每个子线程(worker)的实例数和任务数。2.2.3选择线程执行任务selectThead(){letmin=Number.MAX_SAFE_INTEGER;leti=0;letindex=0;//找到任务数最少的线程,把任务交给他for(;i{letthread;//如果没有线程,创建一个if(this.workerQueue.length){thread=this.selectThead();//任务队列不为空if(thread.queueLength!==0){//子线程数还没有达到核心线程数,则新建线程为处理if(this.workerQueue.lengththis.maxWork){//任务总数已经达到阈值,但是已经线程数还没有达到阈值,则创建if(this.workerQueue.length{userWork.emit('done',result);});}catch(error){resolve(userWork);setImmediate(()=>{userWork.emit('错误',错误);});}返回;caseDISCARD_POLICY.DISCARD_OLDEST:thread.worker.postMessage({cmd:'delete'});break;caseDISCARD_POLICY.DISCARD:returnreject(newError('discard'));caseDISCARD_POLICY.NOT_DISCARD:break;default:break;}}}}}else{thread=this.newThread();}//生成一个任务idconstworkId=this.generateWorkId();//新建一个工作交给对应的子线程constwork=newWork({workId,filename,options});constuserWork=newUserWork({workId,threadId:thread.worker.threadId});thread.queueLength++;this.totalWork++;thread.worker.postMessage({cmd:'add',work});resolve(userWork);})}提交任务的功能比较复杂。提交任务时,调度中心会根据当前负载情况和线程数决定如何处理任务。如果可以处理,则将任务交给选定的子线程。最后返回一个UserWorker对象给用户。2.3调度中心与子线程通信数据结构用户代码this.data=null;//执行错误this.error=null;//执行时输入参数this.options=options;}}一个任务对应一个id,目前只支持文件的执行方式,以及将来会支持字符串。2.4子线程的实现子线程的实现主要分为几个部分2.4.1监听调度中心下发的命令parentPort.on('message',({cmd,work})=>{switch(cmd){case'delete':returnqueue.shift();case'add':returnqueue.push(work);}});2.4.2轮询是否有待处理的任务functionpoll(){constnow=Date.now();if(now-lastWorkTime>maxIdleTime&&!queue.length){process.exit(0);}setTimeout(async()=>{//进程任务poll();}},pollIntervalTime);}//轮询判断是否有任务poll();不断轮询是否有待处理的任务,如果没有且空闲时间达到阈值,则退出。2.4.3任务处理方式分为同步和异步while(queue.length){constwork=queue.shift();try{const{filename,options}=work;constasyncFunction=require(filename);if(!isAsyncFunction(asyncFunction)){return;}lastWorkTime=now;constresult=awaitasyncFunction(options);work.data=result;parentPort.postMessage({event:'done',work});}catch(error){work.error=错误.toString();parentPort.postMessage({event:'error',work});}}用户需要导出一个异步函数。该方案主要用于在执行过程中向用户传递参数。并实现同步。处理后通知调度中心。下面是异步的处理方式,子线程不需要同步等待用户的代码结果。constarr=[];while(queue.length){constwork=queue.shift();try{const{filename}=work;constasyncFunction=require(filename);if(!isAsyncFunction(asyncFunction)){return;}arr。push({asyncFunction,work});}catch(error){work.error=error.toString();parentPort.postMessage({event:'error',work});}}arr.map(async({asyncFunction,work})=>{try{const{options}=work;lastWorkTime=now;constresult=awaitasyncFunction(options);work.data=result;parentPort.postMessage({event:'done',work});}catch(e){work.error=error.toString();parentPort.postMessage({event:'done',work});}})最后还有一些配置和定义的功能。module.exports={//最大线程数MAX_THREADS:50,//线程池最大任务数MAX_WORK:Infinity,//默认核心线程数CORE_THREADS:10,//最大空闲时间MAX_IDLE_TIME:10*60*1000,//子线程轮询时间POLL_INTERVAL_TIME:10,};//丢弃策略constDISCARD_POLICY={//errorABORT:1,//在主线程中执行CALLER_RUNS:2,//丢弃最早的taskDISCARD_OLDEST:3,//丢弃DISCARD:4,//不丢弃NOT_DISCARD:5,};Supportmultipletypesofthreadpoolsoptions){super({...options,sync:true});}}//cpu类型任务的线程池,线程数与cpu核数相同,并且不支持动态扩展classCPUThreadPoolextendsThreadPool{constructor(options){super({...options,coreThreads:cores,expansion:false});}}//线程池只有一个线程,类似于消息队列classSingleThreadPoolextendsThreadPool{constructor(options){super({...options,coreThreads:1,expansion:false});}}//固定线程数的线程池不支持动态扩展线程有很多细节需要说明被考虑。下面是一个性能测试的例子。3测试const{MAX}=require('./constants');module.exports=asyncfunction(){letret=0;leti=0;while(i++