当前位置: 首页 > 后端技术 > Node.js

Koa-spring:Node进程通信实践

时间:2023-04-03 20:38:46 Node.js

关于本文Node多进程进程通信闭包和立即执行有兴趣的可以fork项目,自己体验Koa-spring:https://github.com/closertb/k...related-client:https://github.com/closertb/k...技术栈:koa+Sequelize+routing-controllers+typescriptNode多进程进程和线程进程:是程序中的计算机关于某个数据集的运行活动是系统资源分配和调度的基本单位,是操作系统结构的基础。进程是线程容器(来自维基百科),进程是资源分配的最小单位。每个进程都有自己独立的空间地址和数据栈。一个进程不能访问另一个进程中定义的变量和数据结构。只有建立了IPC通信,进程之间才能共享数据。线程:线程是操作系统能够进行操作调度的最小单位。首先我们要明白,线程属于进程,包含在进程中。一个线程只能属于一个进程,但一个进程可以有多个线程。这里只是一个概念。有很多关于Node进程和线程的文章。这里推荐一下(很多文章都老了,但都是精髓):Node.js探秘:当我们谈集群的时候认识单线程Node.jsWhatareustalkingabout(Part1)Whatarewetalking关于当我们谈论集群时(第2部分)另外,我推荐ParkLing:深入浅出地解释NodeJs。解决了Node.js在单进程单线程模式下CPU利用率不足的问题,充分利用了多核CPU的性能。进程间内存隔离,无法共享数据,需要通信;进程之间的通信是异步的;流程通信需求是这样的,第一篇文章提到的认证中间件,系统登录后,我需要缓存用户的登录状态。下次用户发起请求时,我直接根据缓存检查token是否有效。一开始我的系统是单进程的,操作很简单:当用户登录时,缓存缓存,cache.put(id,user,expirations);认证时,根据请求中携带的id和token,检查是否与缓存中缓存的一致:token===cache.get(id).token;简单、粗暴、高效,直到我开启了集群模式的多进程,让程序更加健壮。很显然,上面的方案已经行不通了,进程间的缓存是有不同的内存块的,所以改变解决方案的唯一办法就是使用进程间通信,而不是使用redis。下面是一个简单的示意图:简单的说,主进程和工作进程之间的IPC通道用于通信。登录成功后,工作进程将认证信息发送给主进程,主进程存储。下次请求认证时,工作进程向主进程发送读取缓存的消息,并开始监听;主进程监听此消息,读取缓存中的认证信息,然后发送给工作进程;工作进程收到后,判断携带的token是否与主进程缓存的token一致。如果一致,则认证通过。部分示例代码://index.tsconstworker=cluster.fork();//监听消息事件worker.on('message',(msg:ActionBody)=>{const{type,payload}=msg;if(type=='readCache'){const{uid,id}=payload;constres=cache.get(id)||{};worker.send({type:'sendCache',uid,payload:res})}if(type=='saveCache'){cache.put(payload.id,payload,ExpiredTime);}});//userController.ts,发起缓存认证信息消息process.send({type:'saveCache',payload:res});//AuthCheckMiddleWare.tsfunctionreadCache(id:string){returnnewPromise((res,rej)=>{process.on('message',(m)={if(m.uid===id){res(m.payload);}else{rej({});}});process.send({type:'readCache',payload:{id,uid:id}});});}constuser:any=awaitreadCache(uid);if(user&&user.token===token){ctx.user=用户;awaitnext();}else{ctx.body={code:120001,message:uid?'登录超时,请重新登录':'Pleaseloginfirst',status:'error'};}上面显示的是一个简单版的进程通信,低并发请求时,运行没有问题它,但如果你是老手,你会发现很多错误。闭包和立即执行函数上一节中显示的代码中有多少错误?简单列举一下:process.on重复订阅,即每次发起鉴权,都会添加一个订阅。就像浏览器的监听一样,重复订阅监听,就会重复响应;由于通信是异步的,订阅是同时发起的,并发越高,就会有响应堆积,所以维护一个响应队列是唯一的办法,是比较好的办法;在rejection的情况下,程序没有相应的错误处理;会不会出现主进程没有响应或者worker进程没有响应的情况?于是基于以上问题,对readCache函数做了如下改动:typeCallback=(m:ActionBody)=>boolean;函数generateUid(){constrandom=Math.floor(26*Math.random()+65);return`${Date.now()}-${String.fromCharCode(random)}`;}//回调队列constcallbackList:Array=[];process.on('message',(m:ActionBody={type:'sendCache'})=>{lettempList=callbackList.slice();tempList.forEach((callback,i)=>{callback=tempList[i];if(callback&&callback(m)){callbackList.splice(i,1);//成功处理响应或响应已过期,移除此回调;}});});functionaddCallback(callback:Callback){callbackList.push(callback);}functionreadCache(id:string){returnnewPromise((res,rej)=>{try{constuid=generateUid();addCallback(((uid:string)=>{status=false;timeout=setTimeout(()=>{//5秒超时读取,防止永久非回调,导致回调一直存在于回调列表中:可能性很小rej({message:'授权验证超时'});状态=真;},5000);return(m:ActionBody)=>{//必须在过期前响应if(!status&&m.uid===uid){clearTimeout(timeout);资源(m.payload);返回真;}返回状态;}})(uid));process.send({type:'readCache',payload:{id,uid}});}catch(错误){rej(错误);}});}OK看到了,针对上面提到的问题,做了如下改进:维护一个callbackList,用于响应回调列表;每收到一个响应,回调列表中的一个一个执行,如果响应成功,则从回调列表中删除;唯一的uid,保证消息发送和响应的一一对应;使用setTimeout逻辑处理超时响应;通讯优化完成,但为什么章节标题关闭和立即执行有关系?无意中遇到了闭包,为了长记性,特意用了这么一个称呼。事情是这样的,回调函数不是像上面那样写的,而是这样写的:try{constuid=generateUid();addCallback((m:ActionBody)=>{//必须在到期前响应if(m.uid===uid){res(m.payload);returntrue;}returnfalse;}});process.send({type:'readCache',payload:{id,uid}});}看起来没什么问题,不并发运行也没什么问题。但是当我提高并发量的时候,偶尔会报权限错误。上次排查,发现m.uid===uid这一行的uid和预期的不一致,被篡改成了下一次请求生成的uid(不好意思脑袋想了N分钟,呵呵,事实证明,关闭的锅)。解决闭包最好的方法是什么,立即执行函数,然后才有上面的写法。如果有兴趣恢复这个进程,可以在主进程的响应中加入500ms的延迟。至此,我对前端到Node服务器的探索告一段落,但是很多想做的事情都没有实现。我希望下次机会来临时,我有能力在我的服务中实现Graphql。使用Github的APIV4后,发现Graphql对架构的要求极高。系列索引Part1:Koa-spring:后端太忙,我自己写服务(Part1)Part2:Koa-spring:后端太忙,让我自己写服务(Part2)