官方定义中的Stream(流)Stream(流)是Node.js中处理流数据的抽象接口。流模块用于构造实现流接口的对象。我们一般直接使用node提供的stream对象,比如在serverrequests和filemodules中。Stream的分类Writable-可以写入数据的流(如fs.createWriteStream())。可读-可以从中读取数据的流(例如fs.createReadStream())。双工-可读可写流(例如net.Socket)。Transform——Duplex流的重要事件和方法(如zlib.createDeflate()),可以在读写过程中修改或转换数据。下面将结合具体的例子,对流的常见事件和方法进行梳理,加深对流的理解。1.数据和结束事件流类型:可读流数据事件在可读流向消费者传输数据后触发。特别地,添加了这个事件的流会自动切换到流模式,结束事件会在流没有数据可供消费时触发:conststream=fs.createReadStream('./file.txt')//默认staticletchunks=[]stream.on("data",(chunk)=>{//变成流式动态chunks.push(chunk)})stream.on("end",()=>{constcontent=Buffer.concat(chunks).toString()console.log(content)})其中chunk为buffer类型补充,任何时刻,可读流都会处于以下三种状态之一:readable.readableFlowing===nullreadable.readableFlowing===falsereadable.readableFlowing===true一开始readable.readableFlowing为null,添加数据事件后变为true。调用readable.pause()、readable.unpipe(),或者接收背压,readable.readableFlowing会被置为false。在这种状态下,数据事件的绑定监听器不会使readable.readableFlowing切换到true2,可读事件和read()方法流类型:可读流可读事件表明流有新的动态:要么有新数据,要么到达流的末尾。下面是一个读取文件的例子:conststream=fs.createReadStream('./file.txt')letchunks=[]//流中没有数据也会触发readable,read方法会获取null。//Read也会在流结束时触发,结束事件之前stream.on("readable",()=>{console.log('triggerreadable');letdata;while(data=stream.read(1024)){chunks.push(data)console.log('读取数据',data);}})stream.on("end",()=>{constcontent=Buffer.concat(chunks.toString();console.log(content)})使用readable会使流的状态进入暂停模式,即使数据事件被监听。data事件在调用read方法有返回数据时触发。上面代码中,read方法读取内部缓冲区中的数据。如果未指定大小参数,则读取内部缓冲区中的所有数据。请注意,它不是流中的所有数据。如果不指定大小,就不用while循环了,直接一次性读取,while循环代码块可以改成:data=stream.read()data&&chunks.push(data)console.log('读取数据',data);根据运行结果来看,read方法的readable事件是在读取到buffer数据后,即read()返回null时触发的。这是读取可读流的第二种方式,即通过read()读取3、pipe()和unpipe()流类型:可读流的定义见官方文档。以下示例使用管道响应http请求consthttp=require('http')constfs=require('fs')constserver=http.createServer()server.on('request',(request,response)=>{conststream=fs.createReadStream('./file.txt')stream.pipe(response)})server.listen(8888)使用pipe时,自动管理数据流,所以即使可读流更快,目标可写流将不会被重载。此外,pipe()将返回对目标流的引用并支持链式操作。假设b是一个转换流:a.pipe(b).pipe(c)`unpipe()是一个可写流,用于在绑定之前解除绑定。上面的例子可以用数据事件重写://....server.on('request',(request,response)=>{conststream=fs.createReadStream('./file.txt')stream.on("data",(chunk)=>{response.write(chunk)})stream.on("end",()=>{response.end()//如果使用管道,会触发结束事件默认在可读流上然后调用end()结束写})})//...但是这样写可能会使可写流过载,这就需要引入drain的概念4.drainandfinisheventstreamtypes:writablestream如果在可写流上调用write()返回false,表示写入速度太快,无法再写入。当可以继续将数据写入流时,将触发“drain”事件。对于上面的例子,我们来测试下drain事件是否被触发://....server.on('request',(request,response)=>{conststream=fs.createReadStream('./file.txt')stream.on("data",(chunk)=>{response.write(chunk)})stream.on("end",()=>{response.end()})response.on("drain",()=>{console.log('可以写')})})//...文件file.txt的大小约为100kb。运行后会触发drain事件4次。虽然写的太快了,但是从http响应的结果来看,数据并没有丢失。查了下文档,看到有这样的解释:当stream还没有被清空时,调用write()会缓冲chunk并返回false。一旦所有当前缓冲的数据块都被排空(由操作系统接收和传输),就会触发“排空”事件。建议一旦write()返回false,将不再写入数据块,直到触发'drain'事件。当流还没有被排空时,也可以调用write(),Node.js会缓冲所有写入的数据块,直到达到最大内存使用量,此时它会无条件中止。所以当write()返回false时,不要向其中写入数据。上面的例子可以这样优化:server.on('request',(request,response)=>{conststream=fs.createReadStream('./file.txt')letok=truestream.on("data",(chunk)=>{ok=response.write(chunk)if(!ok){stream.pause()console.log('Don'twrite')ok=true}})stream.on("end",()=>{response.end()})response.on("drain",()=>{console.log('可以写')stream.resume()})})有点麻烦这么写,还是直接用pipe()比较方便。这里其实涉及到流缓冲的概念和背压问题,可以查看相关文档进一步研究。finish事件在调用end()并且所有缓冲数据都已传递到底层系统后触发。5.Pause()和resume()流类型:可读流动态和静态切换,改变数据事件是否触发6.write()和end()流类型:可写流上面的例子已经涉及到。write是向可写流写入数据,end表示写入完成,不能再长时间调用write
