通过流式处理,我们可以将一大块数据拆分成一小块,一点一点地流动,而不用一下子全部读取。在Linux下,我们可以通过|来实现symbol,同样的,在Nodejs的Stream模块中也为我们提供了pipe()方法。1.选择StreampipeKoa的基础例子来实现这个简单的demo,因为之前在《Nodejs技术栈》交流群里有人问了一个问题,Koa中如何返回一个Stream,我就借这个机会提一下顺便说一下。1.1Nodejs中的I/O操作在不使用Stream管道的情况下是异步的。首先使用util模块的promisify方法将fs.readFile的回调形式转换为Promise形式。这段代码看起来还行,但是它的体验不是很好,因为它是一次性把数据读到内存中,然后再返回。当数据文件很大时,也会消耗内存。内存泄露之类的问题也很容易出现,所以不推荐。constKoa=require('koa');constfs=require('fs');constapp=newKoa();const{promisify}=require('util');const{resolve}=require('path');constreadFile=promisify(fs.readFile);app.use(asyncctx=>{try{ctx.body=awaitreadFile(resolve(__dirname,'test.json'));}catch(err){ctx.body=err};});app.listen(3000);1.2使用Streampipe下面看看Koa框架中如何通过Stream响应数据...app.use(asyncctx=>{try{constreadable=fs.createReadStream(resolve(__dirname,'test.json'));ctx.body=readable;}catch(err){ctx.body=err};});上面直接在Koa中创建了一个可读流,赋值给ctx.body,你可能会疑惑为什么没有管道方法,因为框架已经帮你封装好了,不要被表象迷惑了,看相关源码://https://github.com/koajs/koa/blob/master/lib/application.js#L256functionrespond(ctx){...letbody=ctx.body;if(bodyinstanceofStream)returnbody.pipe(res);...}没有魔法,框架做一层判断返回的时候,因为res是一个可写的流对象。如果body也是一个Stream对象(此时Body是一个可读流),使用body.pipe(res)以流的方式响应。1.3使用StreamVS不使用Stream看一张图,不得不说图片太可爱了,来源https://www.cnblogs.com/vajoy/p/6349817.html二、调用过程及实现原理pipe的是最后流式响应数据的核心实现,就是使用pipe方法实现输入输出。本节的重点也是研究管道的实现。下面我们通过阅读源码来看看最好的打开方式。2.1在应用层,我们调用了fs.createReadStream()方法,找到了该方法创建的可读流对象的pipe方法的实现。下面仅列出核心代码实现,基于Nodejsv12.x源码。2.1.1/lib/fs.js导出了一个createReadStream方法,其中创建了一个ReadStream可读流对象,ReadStream来自于internal/fs/streams文件,继续往下看。//https://github.com/nodejs/node/blob/v12.x/??lib/fs.js//延迟加载,主要用于实例化ReadStream、WriteStream等对象...functionlazyLoadStreams(){if(!ReadStream){({ReadStream,WriteStream}=require('internal/fs/streams'));[FileReadStream,FileWriteStream]=[ReadStream,WriteStream];}}functioncreateReadStream(path,options){lazyLoadStreams();returnnewReadStream(路径,options);//创建一个可读流}module.exports=fs={createReadStream,//导出createReadStream方法...}2.1.2/lib/internal/fs/streams.js这个方法的构造函数ReadStream定义在它,以及open、_read和_destroy等方法都定义在原型上,但是并没有我们要找的pipe方法。但是,继承是通过ObjectSetPrototypeOf方法实现的。ReadStream继承了Readable原型中定义的函数,然后继续寻找Readable的实现。//https://github.com/nodejs/node/blob/v12.x/??lib/internal/fs/streams.jsconst{Readable,Writable}=require('stream');functionReadStream(path,options){if(!(thisinstanceofReadStream))returnnewReadStream(path,options);...Readable.call(this,options);...}ObjectSetPrototypeOf(ReadStream.prototype,Readable.prototype);ObjectSetPrototypeOf(ReadStream,Readable);ReadStream.prototype.open=function(){...};ReadStream.prototype._read=function(n){...};;ReadStream.prototype._destroy=function(err,cb){...};...module.exports={ReadStream,WriteStream};2.1.3/lib/stream.js在stream.js的实现中,有注释:ImportStreambeforeReadable/Writable/Duplex/...原因是为了避免交叉-reference(require),为什么会这样?第一步stream.js将require('internal/streams/legacy')export复制到Stream。下面的_stream_readable、Writable、Duplex...模块也会依次引用stream.js文件,具体实现见下文。Stream导入internal/streams/legacy上面的/lib/internal/fs/streams.js文件,从stream模块中获取一个Readable对象,也就是下面的Stream.Readable的定义。//https://github.com/nodejs/node/blob/v12.x/??lib/stream.js//注意:exportStreambeforeReadable/Writable/Duplex/...//避免交叉引用(require)issuesconstStream=module。exports=require('internal/streams/legacy');Stream.Readable=require('_stream_readable');Stream.Writable=require('_stream_writable');Stream.Duplex=require('_stream_duplex');Stream.Transform=require('_stream_transform');Stream.PassThrough=require('_stream_passthrough');...2.1.4/lib/internal/streams/legacy.js上面的Stream等于internal/streams/legacy,首先继承Eventsmodule,之后在prototype上定义了pipe方法。刚开始看到这个的时候以为实现就到这里了,但是看了_stream_readable的实现,发现_stream_readable继承了Stream,然后重新实现了pipe方法。于是疑惑这个模块的pipe方法是干什么用的?什么时候用?翻译文件名“legacy=legacy”?不太明白,是不是留下来了?有清楚大佬的可以指点一下,也欢迎加我微信在公众号《Nodejs技术栈》后台一起讨论!//https://github.com/nodejs/node/blob/v12.x/??lib/internal/streams/legacy.jsconst{ObjectSetPrototypeOf,}=primordials;constEE=require('events');functionStream(opts){EE。调用(这个,选择);}ObjectSetPrototypeOf(Stream.prototype,EE.prototype);ObjectSetPrototypeOf(Stream,EE);Stream.prototype.pipe=function(dest,options){...};module.exports=Stream;2.1.5/lib/_stream_readable.js在_stream_readable.js的实现中定义了Readable构造函数,继承自Stream。这个Stream就是我们上面提到的/lib/stream.js文件,在/lib/stream.js文件internal/streams/legacy文件中加载并重写里面定义的pipe方法。经过上面的一系列分析,终于找到了可读流的管道在哪里,同时进一步实现了创建可读流时的执行调用流程,下面将重点介绍该方法的实现。module.exports=Readable;Readable.ReadableState=ReadableState;constEE=require('events');constStream=require('stream');ObjectSetPrototypeOf(Readable.prototype,Stream.prototype);ObjectSetPrototypeOf(Readable,Stream);functionReadable(options){if(!(thisinstanceofReadable))returnnewReadable(options);...Stream.call(this,options);//继承自Stream构造函数的定义}...2.2_stream_readable实现分析2.2.1声明constructorReadable声明构造函数Readable继承了Stream的构造函数和原型。Stream是/lib/stream.js文件。上面分析过,这个文件继承了events事件,此时也有events原型中定义的属性,比如on,emit等方法。constStream=require('stream');ObjectSetPrototypeOf(Readable.prototype,Stream.prototype);ObjectSetPrototypeOf(Readable,Stream);functionReadable(options){if(!(thisinstanceofReadable))returnnewReadable(options);...Stream.call(this,options);}2.2.2声明pipe方法,订阅data事件在Stream原型上声明pipe方法,订阅data事件,src为可读流对象,dest为可写流对象。我们在使用pipe方法的时候,也是监听data事件,在读数据的同时写数据。查看ondata()方法中的几个核心实现:dest.write(chunk):接收chunk写入数据,如果内部buffer小于创建流时配置的highWaterMark,则返回true,否则返回false,停止发送数据向流写入数据,直到触发“drain”事件。src.pause():可读流会停止数据事件,也就是此时暂停数据写入。之所以调用src.pause()是为了防止数据读入太快来不及写入。什么时候来不及写?取决于dest.write(chunk)何时返回false,这是根据创建流时传递的highWaterMark属性,默认为16384(16KB),对象模式流默认为16。注意:16KB不是16Kb,这也是之前犯的错误。这里有大写B和小写b的区别。计算机中的所有数据都用0和1来表示,其中0或1称为位(bit),用小写的b表示。大写的B表示字节(byte),1byte=8bit,大写的K表示千位,所以就是千位(Kb)和千字节(KB),一般用KB来表示一个文件的大小。Readable.prototype.pipe=function(dest,options){constsrc=this;src.on('data',ondata);functionondata(chunk){constret=dest.write(chunk);if(ret===false){...src.pause();}}...};2.2.3订阅drain事件,继续流数据上面说了,在data事件中,如果调用dest.write(chunk)返回false,src会调用.pause()停止数据流,什么时候再开始?如果说当事件可以继续写入流时会触发drain事件,也是当dest.write(chunk)等于false时,如果ondrain不存在就注册drain事件。Readable.prototype.pipe=function(dest,options){constsrc=this;src.on('data',ondata);functionondata(chunk){constret=dest.write(chunk);if(ret===false){...if(!ondrain){//Whenthedestdrains,itreducestheawaitDraincounter//onthesource.Thiswouldbemoreelegantwitha.once()//handlerinflow(),butaddingandremovingrepeatedlyis//tooslow.ondrain=pipeOnDrain(src);dest.on('drain',ondrain);}src.pause();}}...};//当可写流dest耗尽时,会减少可读流对象source上的awaitDrain计数器//以保证所有缓冲的写入完成,即state.awaitDrain===0且src可读流上的数据事件存在,将流切换为流模式',state.awaitDrain);if(state.awaitDrain)state.awaitDrain--;if(state.awaitDrain===0&&EE.listenerCount(src,'data')){state.flowing=true;flow(src);}};}//stream.read()拉取并返回数据a来自内部缓冲区。如果没有要读取的数据,则返回null。在可读流src上也有一个readable属性,如果调用readable.read()functionflow(stream){conststate=stream._readableState;debug('flow',state.flowing);while(state.flowing&&stream.read()!==null);}2.2.4触发数据事件调用可读的resume()方法,触发可读流的'data'事件,进入流模式。Readable.prototype.pipe=function(dest,options){constsrc=this;//Starttheflowifithasn'tbeenstartedalready.if(!state.flowing){debug('piperesume');src.resume();}...然后实例上面的resume(定义在Readable原型上)会调用resume()方法,并在方法内部调用resume_(),最后执行stream.read(0)读取一个空数据(size设置为0),会触发实例上的_read()方法,然后触发数据事件。functionresume(stream,state){...process.nextTick(resume_,stream,state);}functionresume_(stream,state){debug('resume',state.reading);if(!state.reading){stream.read(0);}...}2.2.5订阅结束事件结束事件:当可读流中没有数据可供消费时触发,调用onend函数,执行dest.end()方法,表示没有数据被消费写入可写流,关闭(关闭可写流的fd),再调用stream.write()会报错。Readable.prototype.pipe=function(dest,options){...constdoEnd=(!pipeOpts||pipeOpts.end!==false)&&dest!==process.stdout&&dest!==process.stderr;constendFn=doEnd?onend:unpipe;if(state.endEmitted)process.nextTick(endFn);elsesrc.once('end',endFn);dest.on('unpipe',onunpipe);...functiononend(){debug('onend');dest.end();}}2.2.6触发管道事件在pipe方法中,最后会触发一个管道事件,传入一个可读流对象。Readable.prototype.pipe=function(dest,options){...constsource=this;dest.emit('pipe',src);...};在应用层使用时,可以在可写流上订阅pipe事件,进行一些判断。具体可以参考官网给出的例子stream_event_pipe2。2.7支持链式调用并最终返回dest,支持类unix用法:A.pipe(B).pipe(C)Stream.prototype.pipe=function(dest,options){returndest;};3.小结本文大致分为两部分:第一部分比较基础,讲解NodejsStream的pipe方法在Koa2中是如何应用的。第二部分还是在寻找它的实现,并对源码进行简单的分析。其实pipe方法的核心就是监听数据事件,将数据写入可写流。如果内部缓冲区大于创建流时配置的highWaterMark,则必须停止数据流,直到触发或结束drain事件,当然还必须监听end和error等事件进行一些处理。4.参考nodejs.cn/api/stream.htmlcnodejs.org/topic/56ba030271204e03637a3870github.com/nodejs/node/blob/master/lib/_stream_readable.js本文转载自微信公众号《Nodejs技术栈》,您可以通过下方二维码关注。转载本文请联系Nodejs技术栈公众号。
