本文转载自微信公众号《编程杂技》,作者theanarkh。转载本文请联系编程杂技公众号。前言:上个版本打开不方便,重新设计了nodejs的线程池库。本文介绍了该库的一些设计和实现。虽然nodejs提供了线程的能力,但是很多时候,直接使用线程或者无限创建线程往往是不行的。例如,我们有一个CPU密集型函数。如果一个请求开一个线程,这显然不是最好的做法,这时候,我们就需要用到池化技术。本文介绍如何设计和实现线程池库(https://github.com/theanarkh/nodejs-threadpool或npminodejs-threadpool)。下面是线程池的整体架构。设计一个线程池,在真正写代码之前需要考虑很多设计,如下:1.任务队列的设计,一个队列,多个线程互斥访问,或者一个线程一个队列,不互斥访问是必需的。2线程退出的设计,主线程可以检测到空闲线程,然后让子线程退出。或者子线程退出,通知主线程。空闲并不一定意味着没有任务就退出。可以设计空闲时间达到阈值后退出,因为创建线程有时间开销。3任务个数的设计,每个线程可以有多个任务,也可以加上任务总数,即所有线程的任务个数相加4选择线程的设计,选择任务数量最少的线程。5种螺纹类型设计,区分核心螺纹和预备螺纹。当任务较少时,核心线程可以处理。如果任务比较多,也会创建一个准备线程来帮助处理。6线程池式设计,cpu密集型,线程数等于核心数,否则可以自定义线程数。7支持任务取消和超时机制,防止任务执行时间过长或死循环。本文介绍的线程池的具体设计思路如下(参考java):1.主线程维护一个队列,子线程的任务由子线程分发。没有互斥访问,子线程不需要维护自己的队列。2线程退出的设计,主线程负责检查子线程的空闲时间是否达到阈值,如果达到则子线程退出。3任务数的设计,主线程负责管理任务数,应该有相应的策略。4选择线程的设计,选择任务数最少的线程。5、线程类型的设计区分了核心线程和预备线程。当任务较少时,核心线程可以处理。如果任务比较多,也会创建一个准备线程来帮助处理。6线程池式设计,cpu密集型,线程数等于核心数,否则可以自定义线程数。7支持任务取消和超时机制。当超时或取消时,主线程判断任务是要执行还是正在执行。如果要执行,则从任务队列中删除。如果正在执行,则相应的子线程会被杀死。我们来看看具体的设计。1主线程与子线程通信的数据结构//task类,一个task对应一个idclassWork{constructor({workId,filename,options}){//taskidthis.workId=workId;//任务逻辑,字符串或js文件Paththis.filename=filename;//任务返回的结果this.data=null;//任务返回的错误this.error=null;//执行时传入的参数task,user-definedthis.options=options;}}主线程给子线程分配任务时,会向子线程发送一个Work对象。在nodejs中,线程间通信需要序列化和反序列化,所以通信数据结构不能包含太多信息。2子线程处理任务逻辑const{parentPort}=require('worker_threads');constvm=require('vm');const{isFunction,isJSFile}=require('./utils');//监听主线程提交任务parentPort.on('message',async(work)=>{try{const{filename,options}=work;letaFunction;if(isJSFile(filename)){aFunction=require(filename);}else{aFunction=vm.runInThisContext(`(${filename})`);}if(!isFunction(aFunction)){thrownewError('worktypeerror:jsfileorstring');}work.data=awaitaFunction(options);parentPort.postMessage({event:'完成',工作});}catch(error){work.error=error.toString();parentPort.postMessage({event:'error',work});}});process.on('uncaughtException',(...rest)=>{console.error(...rest);});process.on('unhandledRejection',(...rest)=>{console.error(...rest);});子线程的逻辑比较简单,就是监听主线程分配的任务,然后执行任务,执行完通知主线程。任务支持js文件和字符串代码的形式。需要返回一个Promise或异步函数。用于通知主线程任务已经完成。3线程池与业务通信//提供给用户端的接口classUserWorkextendsEventEmitter{constructor({workId}){super();//任务idthis.workId=workId;//支持超时取消任务this.timer=null;//任务状态this.state=WORK_STATE.PENDDING;}//超时后取消任务setTimeout(timeout){this.timer=setTimeout(()=>{this.timer&&this.cancel()&&this.emit('timeout');},~~timeout);}//取消之前设置的定时器clearTimeout(){clearTimeout(this.timer);this.timer=null;}//直接取消任务,如果已经执行,则无法取消,这。终止是动态设置的cancel(){if(this.state===WORK_STATE.END||this.state===WORK_STATE.CANCELED){returnfalse;}else{this.terminate();returntrue;}}//修改任务状态setState(state){this.state=state;}}当业务向线程池提交任务时,线程池会返回一个UserWork类,业务方通过UserWork类与线程池通信.4管理子线程的数据结构//管理子线程的数据结构classThread{constructor({worker}){//nodejs的Worker对象,nodejs的worker_threads模块的Workerthis.worker=worker;//线程状态this.state=THREAD_STATE.IDLE;//最后工作时间this.lastWorkTime=Date.now();}//修改线程状态setState(state){this.state=state;}//修改最后工作时间线程的setLastWorkTime(time){this.lastWorkTime=time;}}线程池中维护了多个子线程,Thread类用于管理子线程的信息。5线程池线程池的实现是核心,我们分为几个部分。5.1支持配置constructor(options={}){this.options=options;//子线程队列this.workerQueue=[];//核心线程数this.coreThreads=~~options.coreThreads||config.CORE_THREADS;//线程池中的最大线程数。如果不支持动态扩展,最大线程数等于核心线程数this.maxThreads=options.expansion!==false?Math.max(this.coreThreads,config.MAX_THREADS):this.coreThreads;//超过任务队列长度时的处理策略this.discardPolicy=options.discardPolicy?options.discardPolicy:DISCARD_POLICY.NOT_DISCARD;//是否预创建子线程this.preCreate=options.preCreate===true;//最大空闲线程时间,达到后自动退出this.maxIdleTime=~~options.maxIdleTime||config.MAX_IDLE_TIME;//是否预创建线程池this.preCreate&&this.preCreateThreads();//保存UserWork对应到线程池中的任务this.workPool={};//线程池中当前可用的任务id,每有一个新任务就加1this.workId=0;//线程池中的任务队列this.queue=[];//线程池的任务总数this.totalWork=0;//支持的最大任务数this.maxWork=~~options.maxWork||config.MAX_WORK;//处理任务的超时时间,全局配置this.timeout=~~options.timeout;this.pollIdle();}上面的代码列出了t支持的能力线程池。5.2创建线程newThread(){constworker=newWorker(workerPath);constthread=newThread({worker});this.workerQueue.push(thread);constthreadId=worker.threadId;worker.on('exit',()=>{//找到线程对应的数据结构,然后删除线程的数据结构constposition=this.workerQueue.findIndex(({worker})=>{returnworker.threadId===threadId;});constexitedThread=this.workerQueue.splice(position,1);//退出状态为BUSY,表示任务还在处理中(异常退出)this.totalWork-=exitedThread.state===THREAD_STATE.BUSY?1:0;});//与子线程通信worker.on('message',(result)=>{const{work,event,}=result;const{data,error,workId}=work;//获取对应的userWork通过workIdconstuserWork=this.workPool[workId];//不存在则取消任务if(!userWork){return;}//修改线程池数据结构this.endWork(userWork);//修改线程数据结构thread.setLastWorkTime(Date.now());//如果有还是一个任务,通知子线程处理,否则修改子线程的状态为idleif(this.queue.length){//从任务队列中获取一个任务交给子线程this.submitWorkToThread(thread,this.queue.shift());}else{thread.setState(THREAD_STATE.IDLE);}switch(event){case'done'://通知用户任务完成userWork。emit('done',data);break;case'error'://通知用户任务出错if(EventEmitter.listenerCount(userWork,'error')){userWork.emit('error',error);}break;default:break;}});worker.on('error',(...rest)=>{console.error(...rest);});returnthread;}??创建线程,并保留线程对应的数据结构,退出,通信管理,任务派发。子线程完成任务后通知线程池,主线程通知用户。5.3SelectThreadselectThead(){//找出空闲线程,将任务交给他for(leti=0;i
