当前位置: 首页 > 后端技术 > Node.js

NodeJs流之一详解

时间:2023-04-03 11:46:03 Node.js

如果对NodeJs系列感兴趣,请关注微信公众号:前端S.H.I.E.L.D.或者githubNodeJs系列文章流出早期的unix初出茅庐,在过去的几十年里,它被证明是一种可靠的编程风格,它可以将一个大系统分解成一些小的部分,并让这些部分完美的配合。在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的控制台,都极有可能涉及到流。Node.js中有四种基本流类型:可读-可以从中读取数据的流(例如fs.createReadStream())。可写-可以写入数据的流(例如fs.createWriteStream())。双工-可读可写流(例如net.Socket)。Transform-Duplexstreams,可以在读写过程中修改或转换数据(比如zlib.createDeflate())constfs=require('fs');http.createServer((req,res)=>{fs.readFile('./test.html',function(err,data){if(err){res.statusCode=500;res.end();}else{res.end(data);}})}).listen(3000)上面的代码简单的实现了静态文件的读取和发送,逻辑上是完全可行的。但是,由于readFile是将读取的文件一次性存储在内存中,如果test.html文件很大或者访问次数增多,可能会耗尽服务器内存。这时候就需要使用流来改进:consthttp=require('http');constfs=require('fs');http.createServer((req,res)=>{fs.createReadStream('./test.html').pipe(res);}).listen(3000);fs.createReadStream创建可读流,并逐一读取文件内容供下游消费。这种逐渐读取和消耗的方式有效地减缓了内存消耗。可读流(ReadableStream)我们可以将可读流分为两个阶段:推阶段和拉阶段。在push阶段,通过实现_read方法将数据从底层数据资源池push到bufferpool中。这是数据的生产阶段,拉取阶段是从缓冲池中拉出数据供下游使用,也就是数据的消费阶段。在开始进一步解释之前,先介绍几个字段,这些字段来源于节点源码:data,在objectMode模式下,state.length===state.buffer.length,否则,其值为state.buffer中数据字节的总和state.ended:Boolean表示底层数据池没有可读数据(this.pull(null))state.flowing:Null|Boolean表示当前流模式,其值有三种情况:null(初始状态)、true(流模式)、false(暂停模式)state.needReadable:Boolean是否触发readable事件state.reading:Boolean是否正在读取底层数据state.sync:Boolean是否立即触发数据/可读事件,false为立即触发,true为nexttick再触发(process.nextTick)两种模式有两种可读流模式:流模式(flowing)和暂停模式(paused),在源码中使用state.flowing来标识。两种模式的基本流程遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流模式,只要缓存池中还有未被消费的数据,就会源源不断地提取数据。我们可以把它想象成一个自动水泵。只要通电,它就不会把水池里的水放掉。停了。对于pause模式来说,更像是拿个水桶,需要的时候去池子里打点水。所有可读流以暂停模式启动,可以通过添加数据事件处理程序(提供state.flowing===null)调用stream.resume()调用stream.pipe()可读流也切换回暂停模式模式可以通过添加可读事件处理程序并在没有管道目标时调用stream.pause()来完成。删除所有管道目标(如果有)。可以通过调用stream.unpipe()删除多个管道目标。一切从阅读开始。对于可读流,消费驱动生产。只有在pull阶段调用read函数,才能唤醒push阶段的数据生成,从而带动整个流的运动。所以read是可读流一切的起点。这是按照源码整理的简单流程图,后面会解释一些链接。howMuchToRead调用read(n)时,node会根据实际情况调整读取次数,实际值由howMuchRead函数决定howMuchToRead(n,state){//如果size<=0或者没有可读数据如果(n<=0||(state.length===0&&state.ended))返回0;//objectMode模式下,每次创建一个单位长度的数据if(state.objectMode)return1;//如果不指定sizeif(Number.isNaN(n)){//在执行read()时,由于流式会连续输出数据,//所以每次只输出缓存中的第一个元素时间,而非流模式将读取缓存空if(state.flowing&&state.length)returnstate.buffer.head.data.length;否则返回state.length;}if(n>state.highWaterMark)//更新highWaterMarkstate.highWaterMark=computeNewHighWaterMark(n);//如果缓存中的数据量足够if(n<=state.length)returnn;//如果缓存中的数据不够,//并且资源池中还有可读的数据,那么这次不要读取缓存的数据//保存数据,等下次有足够的数据时//否则,读取空缓冲池if(!state.ended){state.needReadable=true;返回0;}returnstate.length;}end在read函数的事件调用过程中,node会择机决定是否触发end事件。判断标准主要是以下两个条件:if(state.length===0&&state.ended)endReadable(这);底层数据(资源)没有可读数据。此时state.ended为真。通过调用pull(null),表示底层数据当前没有可读数据。缓冲池中没有可读数据。state.length===0调用read([size])时触发该事件(满足上述条件时)doReaddoRead用于判断是否读取底层数据//如果当前状态为暂停模式`state.needReadable`vardoRead=state.needReadable;//如果当前缓冲池为空或者没有足够的缓冲if(state.length===0||state.length-n{console.log(`Received${chunk.length}数据字节。`);});运行这个例子,我们发现终端没有任何输出,这是为什么?原因我们从源码可以看出if(state.flowing!==false)this.resume();由此我们可以改进官方的说法:在可读流初始化状态(state.flowing===null)下,添加数据事件处理程序将流置于流动模式。push只能由可读流的实现调用,并且只能在readable._read()方法中调用。Push是数据生产的核心。消费者通过调用read(n)提示流输出数据,流使用_read()让底层调用push方法向流传输数据。在这个过程中,push方法可能会将数据存储在缓冲池中,也可能直接通过data事件输出。让我们一一分析。如果当前流是流动的(state.flowing===true),缓冲池中没有可读数据,那么数据将直接由事件数据输出//节点源码if(state.flowing&&state.length===0&&!state.sync){state.awaitDrain=0;stream.emit('data',chunk);}让我们举个例子:const{Readable}=require('stream');classExampleReadableextendsReadable{constructor(opt){super(opt);这个.max=100;这次.time=0;}_read(){constseed=setTimeout(()=>{if(this.time>100){this.push(null);}else{this.push(String(++this.time));}clearTimeout(种子);},0)}}constexampleReadable=newExampleReadable({});exampleReadable.on('data',(data)=>{console.log('fromdata',data);});可读事件示例Readable.on('readable',()=>{....});当我们注册一个可读事件时,node会做如下处理:将流切换到暂停模式state.flowing=false;state.needReadable=true;如果缓冲池中的数据没有被消费,则触发可读,stream.emit('readable');否则,判断当前是否正在读取底层数据,如果没有,则开始(nextTick)读取底层数据self.read(0);触发条件state.flow===false当前在暂停模式缓存中池中还有数据或本轮底层数据已读取state.length||state.endedreturn!state.ended&&(state.length