当前位置: 首页 > 后端技术 > Node.js

Node.js源码分析-pipe实现

时间:2023-04-03 20:09:36 Node.js

Node.js源码分析-pipe实现欢迎来到我的博客阅读:《Node.js源码解析-pipe实现》从前两篇文章中学习。如果要将Readable数据写入Writable,必须先手动将数据读入内存,然后再写入Writable。也就是说,每次传递数据,都需要写如下模板代码readable.on('readable',(err)=>{if(err)throwerrwritable.write(readable.read())})为了方便使用,Node.js提供了pipe()方法,方便我们优雅的传输数据。readable.pipe(writable)现在,让我们看看它是如何实现的。Pipe首先需要调用Readable方法的pipe()//lib/_stream_readable.jsReadable.prototype.pipe=function(dest,pipeOpts){varsrc=this;varstate=this._readableState;//记录可写开关(state.pipesCount){case0:state.pipes=dest;休息;案例一:state.pipes=[state.pipes,dest];休息;默认值:state.pipes.push(dest);休息;}state.pipesCount+=1;//...src.once('end',endFn);dest.on('unpipe',onunpipe);//...dest.on('drain',ondrain);//...src.on('data',ondata);//...//确保在触发error事件时,先执行onerrorprependListener(dest,'error',onerror);//...dest.once('close',onclose);//...dest.once('完成',onfinish);//...//触发Writable的管道事件dest.emit('pipe',src);//发送R将eadable更改为流模式if(!state.flowing){debug('piperesume');src.resume();}返回目标;};执行pipe()函数时,先将Writable记录到state.pipes中,然后绑定相关事件,最后如果Readable不是流模式,则调用resume()将Readable改为流模式传递数据。Readable从数据源获取数据后,触发data事件并执行ondata()ondata()相关代码://lib/_stream_readable.js//防止在dest.write(chunk)中调用src.push(chunk)导致重复增加awaitDrain,awaitDrain无法清除,Readable卡住//详见https://github.com/nodejs/node/issues/7278varincreasedAwaitDrain=false;函数ondata(块){调试('ondata');increasedAwaitDrain=false;varret=dest.write(块);if(false===ret&&!increasedAwaitDrain){//防止在dest.write()中调用src.unpipe(dest),导致awaitDrain没有被清除导??致Readable卡住if(((state.pipesCount===1&&state.pipes===dest)||(state.pipesCount>1&&state.pipes.indexOf(dest)!==-1))&&!cleanedUp){debug('falsewriteresponse,暂停',src._readableState.awaitDrain);src._readableState.awaitDrain++;增加等待排水=真;}//进入暂停模式src.pause();在ondata(chunk)函数中,通过dest.write(chunk)写入Writable时,可能会调用src.push(chunk),或者在_write()内部unpipe,会导致awaitDrain增加多次,不能被清空,Readable卡住,当数据不能再写入Writable时,Readable会进入暂停模式,直到所有drain事件触发,触发drain事件,执行ondrain()//lib/_stream_readable.jsvarondrain=pipeOnDrain(src);functionpipeOnDrain(src){returnfunction(){varstate=src._readableState;debug('pipeOnDrain',state.awaitDrain);如果(state.awaitDrain)state.awaitDrain--;//awaitDrain===0,并且有一个数据监听器if(state.awaitDrain===0&&EE.listenerCount(src,'data')){state.flowing=true;流(来源);}};}每次触发drain事件时,awaitDrain都会递减,直到awaitDrain为0此时调用flow(src)让Readable进入流模式。至此,整个数据传输周期已经建立,数据会沿着周期不断流入Writable,直到所有数据写入完成。unpipe不管写入过程中是否有错误,最终都会执行unpipe()//lib/_stream_readable.js//...functionunpipe(){debug('unpipe');src.unpipe(目标);}//...Readable.prototype.unpipe=function(dest){varstate=this._readableState;varunpipeInfo={hasUnpiped:false};//什么都没有if(state.pipesCount===0)returnthis;//只有一个if(state.pipesCount===1){if(dest&&dest!==state.pipes)returnthis;//unpipeall如果没有指定if(!dest)dest=state.pipes;state.pipes=null;state.pipesCount=0;状态.flowing=false;if(dest)dest.emit('unpipe',this,unpipeInfo);归还这个;}//unpipeifnotspecifiedallif(!dest){vardests=state.pipes;varlen=state.pipesCount;state.pipes=null;state.pipesCount=0;state.flowing=false;for(vari=0;i