当前位置: 首页 > 后端技术 > Node.js

为了更优雅的用JS进行“IPC”调用,我写了event-invoke库

时间:2023-04-04 00:30:59 Node.js

后台团队最近有一个新的Node.js模块需要开发,涉及到多进程管理和通信。简化模型可以理解为需要频繁的从master进程调用worker进程的一些方法,简单的设计实现为一个event-invoke库,可以简单优雅的调用。Node.js提供了child_process模块??,可以创建一个worker进程,并通过在master进程中调用fork/spawn等方法获取其对象(简称cp)。父子进程会建立一个IPC通道。在master进程中,可以使用cp.send()向worker进程发送IPC消息,在worker进程中,也可以使用process.send()向父进程发送IPC消息,实现双工通信。目的。(进程管理涉及到比较复杂的工作,本文不做介绍。)最小实现是基于以上前提。借助IPC通道和进程对象,我们可以通过事件驱动的方式实现进程间通信。只需要几行代码。实现基本的调用逻辑,例如://master.jsconstchild_process=require('child_process');constcp=child_process.fork('./worker.js');functioninvoke(){cp.send({name:'methodA',args:[]});cp.on('message',(packet)=>{console.log('result:%j',packet.payload);});}invoke();//worker.jsconstmethodMap={methodA(){}}cp.on('message',async(packet)=>{const{name,args}=packet;constresult=awaitmethodMap[name)(...args);process.send({name,payload:result});});仔细分析上面的代码实现,直观感觉invoke调用不够优雅,调用量大的时候会创建很多消息监听器,必须保证请求和响应是一一对应的,需要大量额外的设计。希望设计一个简单理想的方式,只提供invoke方法,传入方法名和参数,返回一个Promise,像调用本地方法一样进行IPC调用,不考虑消息通信的细节。//假设的IPC调用constres1=awaitinvoker.invoke('sleep',1000);console.log('睡眠1000ms:',res1);constres2=awaitinvoker.invoke('max',[1,2,3]);//3console.log('max(1,2,3):',res2);流程设计从调用模型的角度,可以将角色抽象为Invoker和Callee,对应服务调用者和Provider,内部封装消息通信的细节。parent_process和child_process之间的通信桥梁是操作系统提供的IPC通道。从API的角度,可以简化为两个Event对象(主进程是cp,子进程是process)。Event对象作为中间双工通道的两端,暂且命名为InvokerChannel和CalleeChannel。关键实体和流程如下:在Callee中注册所有可以调用的方法,保存在functionMap中当用户调用Invoker.invoke()时:创建一个promise对象,返回给用户,保存在promiseMap每次调用都会生成一个id,保证调用和执行结果一一对应进行超时控制,超时任务直接执行message,通过name执行对应的方法,比较结果和Completion状态(成功或异常)通过Channel发送消息给InvokerInvoker解析消息,通过id+name找到对应的promise对象,如果成功则resolve,以及失败则拒绝其实这种设计不仅适用于IPC调用,在浏览器场景下也可以直接很好的应用。例如iframe间调用可以包裹window.postMessage(),跨表调用可以使用存储事件,worker.postMessage()可以作为Webworker中的沟通桥梁。快速上手基于以上设计,必然要实现编码,在非工作时间快速完成开发和文档工作。源码:https://github.com/x-cold/event-invoke安装依赖npmi-Sevent-invoke父子进程通信实例示例代码:示例代码//parent.jsconstcp=require('child_process');const{Invoker}=require('event-invoke');constinvokerChannel=cp.fork('./child.js');constinvoker=newInvoker(invokerChannel);asyncfunctionmain(){constres1=awaitinvoker.invoke('sleep',1000);console.log('睡眠1000ms:',res1);constres2=awaitinvoker.invoke('max',[1,2,3]);//3console.log('max(1,2,3):',res2);invoker.destroy();}main();//child.jsconst{Callee}=require('event-invoke');constcalleeChannel=进程;constcallee=newCallee(calleeChannel);//异步方法callee.register(asyncfunctionsleep(ms){returnnewPromise((resolve)=>{setTimeout(resolve,ms);});});//syncmethodcallee.register(functionmax(...args){returnMath.max(...args);});callee.listen();自定义Channel实现PM2进程间调用示例代码:Examplecode//pm2.config.cjsmodule.exports={apps:[{script:'invoker.js',name:'invoker',exec_mode:'fork',},{script:'callee.js',name:'callee',exec_mode:'fork',}],};//callee.jsimportnetfrom'net';importpm2from'pm2';import{Callee,BaseCalleeChannel}from'event-invoke';constmessageType='event-invoke';constmessageTopic='sometopic';classCalleeChannelextendsBaseCalleeChannel{constructor(){super();this._onProcessMessage=this.onProcessMessage.bind(this);process.on('消息',this._onProcessMessage);}onProcessMessage(packet){if(packet.type!==messageType){返回;}this.emit('消息',packet.data);}send(data){pm2.list((err,processes)=>{if(err){throwerr;}constlist=processes.filter(p=>p.name==='invoker');constpmId=list[0].pm2_env.pm_id;pm2.sendDataToProcessId({id:pmId,类型:messageType,topic:messageTopic,data,},function(err,res){if(err){抛出错误;}});});}destory(){process.off('message',this._onProcessMessage);}}constchannel=newCalleeChannel();constcallee=newCallee(channel);//asyncmethodcallee.register(asyncfunctionsleep(ms){returnnewPromise((resolve)=>{setTimeout(resolve,ms);});});//同步方法callee.register(functionmax(...args){returnMath.max(...args);});callee.listen();//让你的进程保持活跃net.createServer().listen();//invoker.jsimportpm2from'pm2';import{Invoker,BaseInvokerChannel}from'event-invoke';constmessageType='event-invoke';constmessageTopic='sometopic';classInvokerChannel扩展BaseInvokerChannel{constructor(){super();this._onProcessMessage=this.onProcessMessage.bind(this);process.on('消息',this._onProcessMessage);}onProcessMessage(packet){if(packet.type!==messageType){返回;}this.emit('消息',packet.data);}send(data){pm2.list((err,processes)=>{if(err){throwerr;}constlist=processes.filter(p=>p.name==='callee');constpmId=list[0].pm2_env.pm_id;pm2.sendDataToProcessId({id:pmId,type:messageType,topic:messageTopic,data,},function(err,res){if(err){throwerr;}});复制代码});}connect(){this.connected=true;}disconnect(){this.connected=false;}destory(){process.off('message',this._onProcessMessage);}}constchannel=newInvokerChannel();channel.connect();constinvoker=newInvoker(channel);setInterval(async()=>{constres1=awaitinvoker.invoke('sleep',1000);控制台。log('sleep1000ms:',res1);constres2=awaitinvoker.invoke('max',[1,2,3]);//3console.log('max(1,2,3):',res2);},5*1000);下一步目标前event-invoke配备了优雅调用“IPC”调用的基础能力,代码覆盖率100%,提供了比较完整的类型描述。感兴趣的同学可以直接使用。有问题可以直接提交Issue。其他一些未来仍需持续完善的部分:示例更丰富,涵盖跨Iframe、交叉tab、Webworker等使用场景。提供开箱即用的通用Channel,异常更友好处理