当前位置: 首页 > 后端技术 > Node.js

NodeStream运行机制

时间:2023-04-03 12:50:20 Node.js

如果你是学习Node的,那么stream一定是你需要掌握的一个概念。想要成为Node高手,流量一定是武林秘籍不可或缺的一部分。引自Stream-Handbook。由此可见streaming对于Node深度学习的重要性。什么是流你可以把streaming理解为一种传输能力。使用流,数据可以以优雅的方式无副作用地传输到目的地。在Node中,NodeStream创建的流专用于String和Buffer,一般使用Buffer。Stream代表一种传输能力,Buffer是传输内容的载体(可以这样理解,Stream:外卖小哥,Buffer:你的外卖)。创建stream时,将ObjectMode设置为true,Stream也可以传输任意类型的JS对象(null除外,null在stream中有特殊用途)。为什么要使用流?现在有一个需求,我们要给客户端传输一个大文件。如果使用下面的方法constfs=require('fs');constserver=require('http').createServer();server.on('request',(req,res)=>{fs.readFile('./big.file',(err,data)=>{if(err)throwerr;res.end(data);});});server.listen(8000);每次接收到请求时,将这个大文件读入内存,然后再将其传输给客户端。这种方法可能会产生以下三种后果:内存耗尽会减慢其他进程并增加垃圾收集器的负载,因此在传输大文件时这种方法不是一个好的解决方案。并发量大,上百个请求过来很容易内存溢出。如果使用流怎么办?constfs=require('fs');constserver=require('http').createServer();server.on('request',(req,res)=>{constsrc=fs.createReadStream('./big.file');src.pipe(res);});server.listen(8000);这样就不会占用太多内存,整个过程很流畅,也很优雅。如果你想在传输过程中对文件进行处理,比如压缩、加密等,也很容易扩展(后面会详细介绍)。流在Node.js中无处不在。从下图可以看出:Stream的分类Stream分为四类:Readable(可读流)Writable(可写流)Duplex(双工流)Transform(转换流)Readable可读流中的数据如下数据可以在两种模式下产生。FlowingMode和Non-FlowingMode两种模式的触发方式和消耗方式不同。流动模式:数据会不断产生,形成“流动”现象。通过监听流的数据事件可以进入该模式。在Non-FlowingMode中:您需要显式调用read()方法来获取数据。这两种模式可以相互转换。流的初始状态为Null。通过监听data事件或者pipe方法,调用resume方法,将流转为FlowingMode状态。在FlowingMode状态下调用pause方法将流设置为Non-FlowingMode状态。在Non-FlowingMode状态下调用resume方法也可以将流设置为FlowingMode状态。下面详细介绍Readable流在以下两种模式下的运行机制。FlowingMode在FlowingMode状态下,创建的myReadable读取流,直接监听数据事件,数据会源源不断的流出供消费。myReadable.on('data',function(chunk){consume(chunk);//消费流程})一旦监听到数据事件,Readable的内部流程如下图所示。核心方法是流内部的read方法,在参数n取不同值时,分别触发不同的操作。下面描述中的hightwatermark表示流内部缓冲池的大小。n=undefined(消费数据,并触发可读流)n=0(触发可读流,但不消费)n>hightwatermark(修改hightwatermark的值)nbuffer(可以返回null,也可以返回buffer中的所有数据(当时最后一次读取))图中黄色标记的_read()是用户需要自己实现实现流的方法。这个方法是真正读取流的方式(可以这样理解,外卖平台为你提供了提供外卖的能力,_read()方法相当于给你订了外卖)。_read方法如何实现,后面会详细介绍。上面的过程可以描述为:监听data方法,Readable会调用里面的read方法触发读流操作,通过判断是同步读还是异步读来判断读到的数据是否放在buffer中。如果是异步的,那么需要调用flow方法继续触发read方法读取流,同时根据size参数判断是否emit('data')消费流,以及循环读取。如果是同步的,则emit('data')消费流,同时继续触发read方法读取流。一旦push方法传入null,整个流就结束了。从用户的角度来看,在这种模式下,您可以通过以下方式使用流constfs=require('./fs');constreadFile=fs.createReadStream('./big.file');constwriteFile=fs.createWriteStream('./writeFile.js');readFile.on('data',function(chunk){writeFile1.write(chunk);})Non-FlowingMode相对于Flowingmode,Non-FlowingMode要简单的多。在这种模式下消费流需要在Non-FlowingMode下使用如下方法myReadable.on('readable',function(){constchunk=myReadable.read()consume(chunk);//consumestream}),Readable的内部流程如下:从这张图可以看出,如果要实现这种模式的读流,还需要实现一个_read方法。整个过程如下:监听readable方法,Readable内部会调用read方法。调用用户实现的_read方法将数据推送到缓冲池,然后发送emitreadable事件通知用户消费。从用户的角度来看,您可以通过以下方式使用此模式下的流constfs=require('fs');constreadFile=fs.createReadStream('./big.file');constwriteFile=fs.createWriteStream('./writeFile.js');readFile.on('readable',function(chunk){while(null!==(chunk=myReadable.read())){writeFile.write(chunk);}});Writable与读流相比,写流的机制更容易理解。写入流使用以下方法将数据写入myWrite.write(chunk);调用write后,内部的Writable流程类似于读流如下图所示。要实现写流,用户还需要实现一个_write方法。整个过程是这样的:调用write后,会先判断是否写入buffer。如果没有,则调用用户实现的_write方法将流写入相应的地方,_write会调用一个writeable内部的回调函数。从消费者的角度来看,以下面代码所示的方式使用写入流。constfs=require('fs');constreadFile=fs.createReadStream('./big.file');constwriteFile=fs.createWriteStream('./writeFile.js');readFile.on('data',function(chunk){writeFile.write(chunk);})如您所见,使用写入流非常简单。我们先解释一下读流和写流是怎么实现的,然后再看看Duplex和Transform是什么,因为理解了读流和写流是怎么实现的,那么理解Duplex和Transform就很简单了。实现一个自定义的Readable实现一个自定义的Readable,只需要实现一个_read方法,在_read方法中调用push方法即可实现数据生产。如下代码所示:constReadable=require('stream').Readable;classMyReadableextendsReadable{constructor(dataSource,options){super(options);this.dataSource=数据源;}_read(){常量数据=this.dataSource.makeData();setTimeout(()=>{this.push(data);});}}//模拟资源池constdataSource={data:newArray(10).fill('-'),makeData(){if(!dataSource.data.length)returnnull;返回数据源.data.pop();}};constmyReadable=newMyReadable(dataSource,);myReadable.on('readable',()=>{letchunk;while(null!==(chunk=myReadable.read())){console.log(大块);}});实现自定义可写要实现自定义可写,只需要一个_write方法。消费_write中的chunk写入对应的地方,调用callback回调。如下代码所示:constWritable=require('stream').Writable;classMywritableextendsWritable{constuctor(options){super(options);}_write(chunk,endcoding,callback){console.log(chunk);回调&&回调();}}constmyWritable=newMywritable();Duplex双工流:简单理解,就是将一个Readable流和一个Writable流绑定在一起,既可以作为读流也可以作为写流。要实现双工流,您需要同时实现_read和_write方法。需要注意的一点是,它包含的Readablestream和Writablestream是完全独立的,相互独立的,两个stream不使用同一个buffer。通过下面的代码可以验证//模拟资源池1constdataSource1={data:newArray(10).fill('a'),makeData(){if(!dataSource1.data.length)returnnull;返回dataSource1.data.pop();}};//模拟资源池2constdataSource2={data:newArray(10).fill('b'),makeData(){if(!dataSource2.data.length)returnnull;返回dataSource2.data.pop();}};constReadable=require('stream').Readable;classMyReadableextendsReadable{constructor(dataSource,options){super(options);this.dataSource=数据源;}_read(){常量数据=this.dataSource.makeData();setTimeout(()=>{this.push(data);})}}constWritable=require('stream').Writable;classMyWritableextendsWritable{constructor(options){super(options);}_write(chunk,encoding,callback){console.log(chunk.toString());回调&&回调();}}constDuplex=require('stream').Duplex;classMyDuplexextendsDuplex{constructor(dataSource,options){super(options);this.dataSource=数据源;}_read(){常量数据=this.dataSource.makeData();setTimeout(()=>{this.push(data);})}_write(chunk,encoding,callback){console.log(chunk.toString());回调&&回调();}}constmyWritable=newMyWritable();constmyReadable=newMyReadable(dataSource1);constmyDuplex=newMyDuplex(dataSource1);myReadable.pipe(myDuplex).pipe(myWritable);打印结果为ababababababababab从这个结果可以看出myReadable.pipe(myDuplex),myDuplex作为一个写流,写入的内容是一个;myDuplex.pipe(myWritable),myDuplex作为读流,但b写入myWritable;所以它包含的Readable流和Writable流是完全独立的。Transform理解Duplex,不如理解Transform。Transform是一个转换流,同时具有读写功能,但它与Duplex不同的是它的读流和写流共享同一个缓冲区;也就是说,通过它读什么,它就可以写什么。实现一个Transform,只需要实现一个_transform方法即可。比如最简单的Transform:PassThrough,它的源码如下PassThrough是一个Transform,但是这个转换流什么都不做,相当于一个透明的转换流。可以看到_transform里面什么都没有,就是简单的回调数据。如果我们在这个环节做一些扩展,只需要直接在_transform中扩展即可。例如,我们可以对流进行压缩、加密、混淆等。BackPress终于在流中引入了一个非常重要的概念:背压。要理解这一点,我们先来看看pipe和highWaterMaker是什么。pipe先看下面的代码constfs=require('./fs');constreadFile=fs.createReadStream('./big.file');constwriteFile=fs.createWriteStream('./writeFile.js');readFile.pipe(writeFile);上面和下面的代码是等价的constfs=require('./fs');constreadFile=fs.createReadStream('./big.file');constwriteFile=fs.createWriteStream('./writeFile.js');readFile.on('data',function(data){varflag=ws.write(data);if(!flag){//当前写流缓冲区已满,暂停读数据readFile.pause();}})writeFile.on('drain',function()){readFile.resume();//当前写流缓冲区已经清空,重新开始读流}readFile.on('end',function(data){writeFile.end();//写入写流缓冲区中的所有数据,并关闭写入的文件})pipe所做的操作相当于写流和读流的速度匹配是自动完成的。如果读写流速度不匹配,一般情况下不会造成任何问题,但会导致内存增加。增加内存消耗可能会导致一系列问题。所以在使用流的时候,强烈推荐使用管道。highWaterMakerhighWaterMaker说白了,就是定义缓冲区的大小。默认16Kb(可读最大8M)可自定义。背压的概念可以理解为:一种防止读写流速度不匹配的调整机制;背压调节机制的触发时机受限于highWaterMaker设置的大小。如上面代码varflag=ws.write(data);,一旦写流的buffer满了,flag就会被置为false,从而反过来促进读流的速度调整。Stream的应用场景主要有以下几个场景1、文件操作(复制、压缩、解压、加密等)下面的函数可以轻松实现文件复制。constfs=require('fs');constreadFile=fs.createReadStream('big.file');constwriteFile=fs.createWriteStream('big_copy.file');读文件管道(写文件);那么我们要在复制过程中压缩文件呢?constfs=require('fs');constreadFile=fs.createReadStream('big.file');constwriteFile=fs.createWriteStream('big.gz');constzlib=require('zlib');readFile。管道(zlib.createGzip())。管道(writeFile);类似实现解压和加密。2.比如静态文件服务器需要返回一个html,可以使用如下代码。varhttp=require('http');varfs=require('fs');http.createServer(function(req,res){fs.createReadStream('./a.html').pipe(res);}).听(8000);