当前位置: 首页 > 后端技术 > Node.js

nodejs中的并发编程

时间:2023-04-03 12:50:00 Node.js

从sleep的实现说起在nodejs中,如果要实现sleep的功能,主要是通过“setTimeout+promise”来实现,也可以通过“loopidling”来解决。前者使用定时器实现任务的延迟执行,通过promise链管理任务间的时序和依赖关系。本质上nodejs的执行线程并没有真正休眠,eventloop和v8还在运行,只是体现在业务逻辑上。睡觉;后者的实现无疑是对CPU性能的浪费,有点类似于自旋锁,不满足大部分场景。为了在引擎级别(运行时)实现睡眠,在ECMAScriptLatestDraft(ECMA-262)出现后,事情开始出现转机。ECMA262规定了Atomics.wait,将调用该方法的代理(引擎)放入等待队列,使其休眠,直到收到通知或超时。本规范在8.10.0以上的nodejs版本上实现。其实Atomics.wait的出现主要解决了浏览器或者nodejsworker之间的数据同步问题。浏览器上的web-worker和nodejs@12官方包含的worker-threads模块都是ECMAScript多线程模型的具体实现。既然是多线程,就免不了提到线程间的同步,可以在前端和nodejs中使用Atomics.wait和notify来解决。说的有点跑题了,回到本节,如何实现运行时休眠呢?很简单,利用Atomics.wait的等待超时机制:letsharedBuf=newSharedArrayBuffer(4);letsharedArr=newInt32Array(sharedBuf);//sleepn秒letsleep=function(n){Atomics.wait(sharedArr,0,0,n*1000);}这里的sleep不是异步方法,它会阻塞执行线程直到超时,所以需要根据业务场景使用休眠模型。下面会解释Atomics.wait的具体用法。多线程同步虽然nodejs的多线程使用场景不多,但是一旦涉及到多线程,线程之间的同步是必不可少的,否则临界区的问题是解决不了的。但是,nodejs的work_threads创建线程的方式与c或java不同。它使用libuv的API来创建线程“uv_thread_create”,但是在这之前,它需要初始化MessagePort、v8实例设置等一些设施,所以创建线程并不是一个轻量级的操作,需要创建一个根据场景适量线程。回到正题,多线程之间的同步一般需要依赖锁,而锁的实现需要依赖全局变量。在nodejs的work_threads实现中,主线程不能设置全局变量,所以可以通过Atomics来实现。如上例所示,Atomics.wait依赖于SharedArrayBuffer,它是一个共享内存的ArrayBuffer,线程可以通过SharedArrayBuffer共享数据。在实际操作ArrayBuffer的时候,并不是直接使用对象,而是TypeArray。比如Atomics.wait,第一个参数必须是一个Int32Array对象,这个对象指向的缓冲区是SharedArrayBuffer。当线程A因为Atomics.wait被阻塞时,可以通过其他线程B调用Atomics.notify将其唤醒,从而使线程A的v8继续执行。让{Worker,isMainThread,parentPort,workerData}=require('worker_threads');varsab=newSharedArrayBuffer(1024);varint32=newInt32Array(sab);如果(isMainThread){constworker=newWorker(__filename,{workerData:sab});worker.on('message',(d)=>{console.log('parentreceivemessage:',d);});worker.on('error',(e)=>{console.error('parentreceiveerror',e);});worker.on('exit',(code)=>{if(code!==0)console.error(newError(`WorkerthreadusingExitcode${code}STOP`));});原子.wait(int32,0,0);//控制台日志(int32[0]);//C:123}else{让buf=workerData;让arrs=newInt32Array(buf);Atomics.store(arrs,0,123);Atomics.notify(arrs,0);//B}上面的例子中,主线程创建线程后,阻塞在A处;在新线程中,通过原子操作Atomics修改SharedArrayBuffer的第一项为123后。]),输出新线程修改的SharedArrayBuffer的第一个数据123。锁解析是公平排他不可重入锁的实现,使用Atomics.wait/notify/compareExchange完成线程同步。main-thread.jsletLock=require('./lock').Lock;let{Worker}=require('worker_threads');constsharedBuffer=newSharedArrayBuffer(1*Int32Array.BYTES_PER_ELEMENT);常量sharedArray=newInt32Array(sharedBuffer);letworker=newWorker('./worker-lock.js',{workerData:sharedBuffer});Lock.initialize(sharedArray,0);constlock=newLock(sharedArray,0);//获取锁。锁();//3s后释放锁setTimeout(()=>{lock.unlock();//(B)},3000)worker-thread.jsletLock=require('./lock').Lock;let{parentPort,workerData}=require('worker_threads');constsharedArray=newInt32Array(workerData);constlock=newLock(sharedArray,0);console.log('等待锁定...');//(A)//获取锁lock.lock();//(B)blocks!console.log('Unlocked');//(C)主线程初始化互斥量,同时创建一个线程,主线程获得锁后三秒内释放;工作线程尝试获取锁。这个时候锁已经被主线程获取到了,所以工作线程就阻塞在这里了。等待3秒后,主线程释放锁苏醒,继续执行输出。lock.jsconstUNLOCKED=0;constLOCKED_NO_WAITERS=1;constLOCKED_POSSIBLE_WAITERS=2;constNUMINTS=1;classLock{//'iab'必须是Int32Array映射共享内存。//'ibase'必须是iab中的有效索引,为锁保留的NUMINTS中的第一个。constructor(iab,ibase){if(!(iabinstanceofInt32Array&&ibase|0===ibase&&ibase>=0&&ibase+NUMINTS<=iab.length)){thrownewError(`BadargumentstoLockconstructor:${iab}${ibase}`);}这个.iab=iab;这个.ibase=ibase;}staticinitialize(iab,ibase){if(!(iabinstanceofInt32Array&&ibase|0===ibase&&ibase>=0&&ibase+NUMINTS<=iab.length)){thrownewError(`BadargumentstoLock构造函数:${iab}${ibase}`);}Atomics.store(iab,ibase,UNLOCKED);返回数据库;}//获取锁,或者阻塞直到我们可以。锁定不是递归的:lock(){constiab=this.iab;conststateIdx=this.ibase;变种C;if((c=Atomics.compareExchange(iab,stateIdx,UNLOCKED,LOCKED_NO_WAITERS))!==UNLOCKED){//Ado{if(c===LOCKED_POSSIBLE_WAITERS||Atomics.compareExchange(iab,stateIdx,LOCKED_NO_WAITERS,LOCKED_POSSIBLE_WAITERS)!==UNLOCKED){Atomics.wait(iab,stateIdx,LOCKED_POSSIBLE_WAITERS,Number.POSITIVE_INFINITY);while((c=Atomics.compareExchange(iab,stateIdx,UNLOCKED,LOCKED_POSSIBLE_WAITERS))!==UNLOCKED);//B}}tryLock(){constiab=this.iab;conststateIdx=this.ibase;返回Atomics.compareExchange(iab,stateIdx,UNLOCKED,LOCKED_NO_WAITERS)===UNLOCKED;}unlock(){constiab=this.iab;conststateIdx=this.ibase;varv0=Atomics.sub(iab,stateIdx,1);//如果有则唤醒服务员if(v0!==LOCKED_NO_WAITERS){Atomics.store(iab,stateIdx,UNLOCKED);Atomics.notify(iab,stateIdx,1);}}toString(){return"Lock:{ibase:"+this.ibase+"}";}}exports.Lock=锁定;当进程A尝试获取锁成功时,A处的判断语句为false,compareExchange将state设置为LOCKED_NO_WAITERS,直接执行其后续逻辑;如果此时进程B执行lock获取锁,A判断为真,进入dowhile循环体,休眠wait;进程A通过unlock释放锁,会将锁状态设置为UNLOCKED并唤醒同时被阻塞的进程B;进程B执行循环判断语句B,此时为假,跳出循环执行B的逻辑。自旋锁或其他逻辑实现非阻塞等待。参考libuvtalkAtomicsAtomicsMDN的线程