本文节选自Node.jsCheatSheet|Node.js语法基础、框架使用与实战技巧,你也可以阅读JavaScriptCheatSheet或现代Web开发基础与工程实践,了解更多JavaScript/Node.js实战应用。Stream是Node.js中的一个基本概念,类似于EventEmitter,专注于IO管道中事件驱动的数据处理;类比数组或映射,Stream也是数据的集合,但它代表的数据不一定在内存中。.Node.js的Stream分为以下几种:ReadableStream:可读流,数据生产者,如process.stdinWritableStream:可写流,数据消费者,如process.stdout或process.stderrDuplexStream:双向流,它可以读或写。TransformStream:转换流,数据转换器Stream本身提供了一套接口规范,Node.js中的很多内置模块都遵循这个规范。比如著名的fs模块就是使用Stream接口读写文件;同样,每个HTTP请求都是一个可读流,而一个HTTP响应是一个可写流。可读流conststream=require('stream');constfs=require('fs');constreadableStream=fs.createReadStream(process.argv[2],{encoding:'utf8'});//手动设置流数据编码//readableStream.setEncoding('utf8');letwordCount=0;readableStream.on('data',function(data){wordCount+=data.split(/\s{1,}/).length;});readableStream.on('end',function(){//不计算文件结尾。console.log('%d%s',--wordCount,process.argv[2]);});当我们创建一个可读流时,它还没有开始数据流;添加数据事件侦听器将使其流动动态。之后,它读取一小块数据并将其传递给我们的回调函数。数据事件的触发频率也由实现者决定。比如读取文件时,可能每行触发一次;当处理一个HTTP请求时,可能只有几KB的数据才会触发一次。可以参考nodejs/readable-stream/_stream_readable中的相关实现,发现on函数会触发resume方法,会调用flow函数读取流://functiononif(ev==='data'){//如果流没有明确暂停,则在下一个滴答开始流动if(this._readableState.flowing!==false)this.resume();}...//functionflowwhile(state.flowing&&stream.read()!==null){}我们也可以监听readable事件,然后手动读取数据:letdata='';letchunk;readableStream.on('readable',function(){while((chunk=readableStream.read())!=null){data+=chunk;}});readableStream.on('end',function(){console.log(data);});ReadableStream还包括以下常用方法:Readable.pause():该方法暂停流的流动。换句话说,它将不再触发数据事件。Readable.resume():这个方法和上面的相反,会恢复暂停的流。Readable.unpipe():此方法将删除目标。如果传入,它会阻止Readable流流向特定目的地,否则,它会删除所有目的地。在日常开发中,我们可以使用stream-wormhole来模拟可读流的消费:。结尾();调用end()时,将写入所有数据并且流将触发完成事件。请注意,在调用end()之后,您将无法再向可写流写入数据。const{Writable}=require('stream');constoutStream=newWritable({write(chunk,encoding,callback){console.log(chunk.toString());callback();}});process.stdin.pipe(outStream);可写流还包含一些与可读流相关的重要事件:pipelineconstfs=require('fs');constinputFile=fs.createReadStream('REALLY_BIG_FILE.x');constoutputFile=fs.createWriteStream('REALLY_BIG_FILE_DEST.x');//当管道建立时,流的流动发生inputFile.pipe(outputFile);顺序调用多个管道,即建立链接(Chaining):constfs=require('fs');constzlib=require('zlib');fs.createReadStream('input.txt.gz').pipe(zlib.createGunzip()).pipe(fs.createWriteStream('output.txt'));管道也常用于web服务器中的文件处理,以Egg.js中的应用为例,我们可以从Context中获取文件流,传入可写文件流:完整代码参考BackendBoilerplate/eggconstawaitWriteStream=require('await-stream-ready').write;constsendToWormhole=require('stream-wormhole');...conststream=awaitctx.getFileStream();constfilename=md5(stream.filename)+path.extname(stream.filename).toLocaleLowerCase();//文件生成绝对路径consttarget=path.join(this.config.baseDir,'app/public/uploads',filename);//生成文件写入文件流constwriteStream=fs.createWriteStream(target);try{//异步写入文件流awaitawaitWriteStream(stream.pipe(writeStream));}catch(err){//如果出错,关闭管道awaitsendToWormhole(stream);throwerr;}...参考分布式系统的介绍,我们可以看到,在典型的流处理场景中,我们都无法避免处理所谓的背压(Backpressure)问题无论是WritableStream还是ReadableStream,数据实际上存储在内部Buffer中,可以通过writable.writableBuffer或者readable.readableBuffer来读取。当要处理的数据存储超过highWaterMark或者当前写流繁忙时,write函数会返回false。pipe函数会自动为我们启用背压机制:当Node.js的流机制检测到write函数返回false时,背压系统会自动介入;它将暂停当前ReadableStream的数据传输操作,直到消费者就绪完成。+===============+|Your_Data|+=======+=======+|+--------v------------++------------------++=================+|可读流||可写流+-------->.write(chunk)|+--------+------------++--------^--------++=======+=========+||||+======================+|+----------------v--------++----->.pipe(目的地)>---+|这个块太大了吗?|+==^=======^========^==+|排队很忙吗?|^^^+------------+------------+---+||||||||>如果(!大块)||^||发出.end();||^^|>否则|||^|发出.write();+---v---++---v---+||^----^----------------<否||是|^|+-------++---v---+^|||^发出.pause();+=================+||^---^--------------------+返回假;<-----+---+|+=================+|||^当队列为空时+============+|^---^----------------^---<缓冲||||============||+>艾米t.drain();|<缓冲区>||+>发出.resume();+------------+||<缓冲区>||+------------+将块添加到队列||<--^----------------<+=============+DuplexStreamDuplexStream可以看作是read的聚合和写流,包括两个相互独立的读写流,有独立的内部缓存。读写操作也可以异步进行:DuplexStream----------------|阅读<-----外部资源你------------------|Write----->ExternalSink-------------------|我们可以使用Duplex来模拟简单的socket操作:const{Duplex}=require('stream');类Duplexer扩展Duplex{constructor(props){super(道具);这个.data=[];}_read(size){constchunk=this.data.shift();if(chunk=='stop'){this.push(null);}else{if(chunk){this.push(chunk);}}}_write(chunk,encoding,cb){this.data.push(chunk);cb();}}constd=newDuplexer({allowHalfOpen:true});d.on('data',function(chunk){console.log('read:',chunk.toString());});d.on('readable',function(){console.log('readable');});d.on('end',function(){console.log('MessageComplete');});d.write('....');在开发中,我们经常需要直接将一个可读流输出到一个可写流,也可以在其中引入PassThrough来进行额外的监控:const{PassThrough}=require('stream');constfs=require('fs');constduplexStream=newPassThrough();//可以从可读流传输fs.createReadStream('tmp.md').pipe(duplexStream);//可以传输到可写流duplexStream.pipe(process.stdout);//监听Data,这里直接输出Buffer
