当前位置: 首页 > 后端技术 > Node.js

一篇文章了解Node.js中的流

时间:2023-04-03 17:51:46 Node.js

作者:LizParodyStream)是出了名的难用,甚至难懂。用DominicTarr的话来说:“Streams是Node中最好的也是最容易被误解的想法。”即使是Redux的创建者和React.js核心团队成员DanAbramov也害怕Node流。本文将帮助您了解流以及如何使用它们。不用担心,您完全可以弄清楚!什么是流?流是支持Node.js应用程序的基本概念之一。它们是顺序从输入读取数据或将数据写入输出的数据处理方法。流是一种以高效方式处理读写文件、网络通信或任何类型的端到端信息交换的方法。流的处理方式很独特,它不像传统方式那样一次将文件全部读入内存,而是逐块读取数据块并处理数据内容,而不是将其全部保存在内存中。这种方法使得流在处理大量数据时非常强大,例如,文件大小可能大于可用内存空间,导致无法将整个文件读入内存进行处理。这就是流进来的地方!流可用于处理较小的数据块,或读取较大的文件。以YouTube或Netflix等“流媒体”服务为例:这些服务不允许您立即下载视频和音频文件。相反,您的浏览器会以连续的块流形式接收视频,从而使接收者几乎可以立即开始观看和收听。然而,流媒体不仅仅是处理媒体和大数据。它们还赋予我们代码“可组合性”的力量。考虑到可组合性进行设计意味着能够以某种方式组合多个组件以产生相同类型的结果。在Node.js中,强大的代码片段可以由在其他较小的代码片段之间传递数据的流组成。为什么使用流与其他数据处理方法相比,流基本上有两个主要优势:内存效率:不需要事先将大量数据加载到内存中进行处理时间效率:获取数据后立即启动有Node.js中的4种类型的流:可写的:可以写入数据的流。例如,fs.createWriteStream()允许我们使用流将数据写入文件。可读:可以从中读取数据的流。例如:fs.createReadStream()让我们读取文件的内容。双工:可读可写流。例如,net.SocketTransform:可以在写入和读取的同时修改或转换数据。例如在文件压缩的??情况下,您可以将压缩数据写入文件并从文件中读取解压缩数据。如果您使用过Node.js,您可能遇到过流。例如,在基于Node.js的HTTP服务器中,请求是一个可读流,响应是一个可写流。您可能已经使用过fs模块,它允许您使用可读和可写的文件流。每当您使用Express时,您都在使用流与客户端交互,并且由于TCP套接字、TLS堆栈和其他连接都基于Node.js,因此在您可以使用的每个数据库连接驱动程序中都使用流。如何创建可读流的实际示例首先获取可读流,然后对其进行初始化。constStream=require('stream')constreadableStream=newStream.Readable()现在流已经初始化并且可以向它发送数据:readableStream.push('ping!')readableStream.push('pong!')asynchronous强烈建议在使用流时将迭代器与异步迭代器一起使用。根据AxelRauschmayer博士的说法,异步迭代是一种用于异步检索数据容器内容的协议(这意味着当前“任务”可以在检索项目之前暂停)。还必须提到的是,流异步迭代器实现在内部使用了可读事件。从可读流中读取时,可以使用异步迭代器:import*asfsfrom'fs';asyncfunctionlogChunks(readable){forawait(constchunkofreadable){console.log(chunk);}}constreadable=fs.createReadStream('tmp/test.txt',{encoding:'utf8'});logChunks(readable);//output://'Thisisatest!\n'也可以采集通过字符串读取流的内容:import{Readable}from'stream';asyncfunctionreadableToString2(readable){letresult='';forawait(constchunkofreadable){result+=chunk;}returnresult;}constreadable=Readable.from('Goodmorning!',{encoding:'utf8'});assert.equal(awaitreadableToString2(readable),'Goodmorning!');请注意,在这种情况下必须使用异步函数,因为我们希望返回一个Promise。请记住不要将异步函数与EventEmitter混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。当前的最佳实践是始终将异步函数的内容包装在try/catch块中并处理错误,但这是容易出错的。这个pullrequest旨在修复Node核心上的问题。要了解有关异步迭代的Node.js流的更多信息,请查看这篇很棒的文章。Readable.from():从可迭代对象创建可读流stream.Readable.from(iterable,[options])这是一个实用方法,用于从持有可迭代对象的迭代器创建可读流。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,可用于指定文本编码等。const{Readable}=require('stream');asyncfunction*generate(){yield'hello';yield'streams';}constreadable=Readable.from(generate());readable.on('data',(chunk)=>{console.log(chunk);});两种读取模式根据StreamsAPI,可读流以两种模式之一有效地运行:流动和暂停。可读流可以是对象模式,可以是流动模式,也可以是暂停模式。在流模式下,数据自动从底层系统读取,并通过EventEmitter接口使用事件尽快提供给程序。在暂停模式下,必须显式调用stream.read()方法以从流中读取数据块。在流动模式下,要从流中读取数据,您可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件并执行您的回调。请参阅下面的代码片段:varfs=require("fs");vardata='';varreaderStream=fs.createReadStream('file.txt');//创建可读流readerStream.setEncoding('UTF8');//设置编码为utf8。//处理流事件-->数据、结束和错误数据);});readerStream.on('error',function(err){console.log(err.stack);});console.log("程序结束");函数调用fs.createReadStream()为您提供可读流。最初,流处于静态。一旦您侦听数据事件并附加回调,它就会开始流动。之后,数据块将被读取并传递给您的回调。流实施者决定发送数据事件的频率。例如,每次读取几KB数据时,HTTP请求可能会发出一个数据事件。从文件中读取数据时,您可能决定在读取一行后发出数据事件。当没有更多数据可读(结束)时,流发出结束事件。在上面的代码片段中,我们侦听此事件以在完成时收到通知。此外,如果出现错误,流将发出并通知错误。在暂停模式下,您只需在流实例上重复调用read()直到读取所有块,如下例所示:varfs=require('fs');varreadableStream=fs.createReadStream('file.txt');vardata='';varchunk;readableStream.on('readable',function(){while((chunk=readableStream.read())!=null){data+=chunk;}});readableStream.on('结束',function(){console.log(data)});read()函数从内部缓冲区读取一些数据并将其返回。当没有可读取的内容时返回null。所以在while循环中,我们检查null并终止循环。请注意,当可以从流中读取大量数据时,会发出可读事件。所有可读流都以暂停模式开始,但可以通过以下方式之一切换到流动模式:添加“数据”事件处理程序。调用stream.resume()方法。调用stream.pipe()方法向可写对象发送数据。Readable可以通过以下方式之一切换回暂停模式:如果没有管道目的地,则通过调用stream.pause()方法。删除所有管道目标(如果有)。可以通过调用stream.unpipe()方法删除多个管道目标。要记住的一个重要概念是,Readable不会产生数据,除非提供了一种机制来使用或忽略该数据。如果消费机制被禁用或取消,Readable将尝试停止生产数据。添加可读事件处理程序将自动停止流流动并通过read.read()获取数据。如果可读事件处理程序被移除,那么如果存在“数据”事件处理程序,流将再次开始流动。如何创建可写流要将数据写入可写流,您需要在流实例上调用write()。如下例所示:varfs=require('fs');varreadableStream=fs.createReadStream('file1.txt');varwritableStream=fs.createWriteStream('file2.txt');readableStream.setEncoding('utf8');readableStream.on('data',function(chunk){writableStream.write(chunk);});上面的代码非常简单。它只是从输入流中读取数据块并使用write()写入目标。该函数返回一个布尔值,指示操作是否成功。如果为真,则写入成功,您可以继续写入更多数据。如果它返回false,说明出了点问题,你现在不能写任何东西。可写流将在它可以开始写入更多数据时通过发出drain事件通知您。调用writable.end()方法表示不再有数据写入Writable。如果提供,将附加一个可选的回调函数作为完成事件的侦听器。//写入'hello,'然后以'world!'结束。constfs=require('fs');constfile=fs.createWriteStream('example.txt');file.write('hello,');file.end('world!');//现在不允许再写了!您可以使用可写流从可读流中读取数据:constStream=require('stream')constreadableStream=newStream.Readable()constwritableStream=newStream.Writable()writableStream._write=(chunk,encoding,next)=>{console.log(chunk.toString())next()}readableStream.pipe(writableStream)readableStream.push('ping!')readableStream.push('pong!')writableStream.end()也可以使用异步迭代器编写可写流,推荐使用import*asutilfrom'util';import*asstreamfrom'stream';import*asfsfrom'fs';import{once}from'events';constfinished=util.promisify(stream.finished);//(A)asyncfunctionwriteIterableToFile(iterable,filePath){constwritable=fs.createWriteStream(文件路径,{编码:'utf8'});forawait(constchunkofiterable){if(!writable.write(chunk)){//(B)//处理backp压力等待一次(可写,'drain');}}writable.end();//(C)//等到完成。如果有错误则抛出。awaitfinished(writable);}awaitwriteIterableToFile(['One','lineoftext.\n'],'tmp/log.txt');assert.equal(fs.readFileSync('tmp/log.txt',{encoding:'utf8'}),'一行文字.\n');stream.finished()的默认版本是基于回调的,但可以通过util.promisify()转换为基于Promise的版本(A行)在这个例子中,使用了以下两种模式:写入可写流在处理背压时(B行):在处理背压时写入可写流(B行):if(!writable.write(chunk)){awaitonce(writable,'drain');}关闭可写流,并等待写入完成(C行):writable.end();等待完成(可写);pipeline()管道是一种机制,通过它一个流的输出可以用作另一个流的输入。它通常用于从一个流中获取数据并将该流的输出传递给另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流式数据。stream.pipeline()是在Node10.x中引入的。这是一个模块方法,用于在流转发错误和正确清理之间进行管道传输,在管道完成时提供回调。这是一个使用管道的例子:const{pipeline}=require('stream');constfs=require('fs');constzlib=require('zlib');//使用管道API轻松转换一系列流//将管道组合在一起,并在管道完全完成时收到通知。//一个高效压缩巨大视频文件的管道:pipeline(fs.createReadStream('The.Matrix.1080p.mkv'),zlib.createGzip(),fs.createWriteStream('The.Matrix.1080p.mkv.gz'),(err)=>{if(err){console.error('管道失败',err);}else{console.log('管道成功');}});由于管道不安全,应该使用管道而不是管道。流模块Node.js流模块提供了构建所有流API的基础。Stream模块是Node.js默认提供的原生模块。Stream是EventEmitter类的一个实例,它在Node.js中异步处理事件。所以流本质上是基于事件的。访问流模块:conststream=require('stream');流模块对于创建新类型的流实例很有用。通常不需要使用流模块来消费流。流驱动的NodeAPI由于其优势,许多Node.js核心模块提供了原生流处理功能,最值得注意的是:net.Socket是流所基于的主要API节点,它是以下大部分的底层进程API.stdin返回连接到stdin的流process.stdout返回连接到stdout的流process.stderr返回连接到stderr的流fs.createReadStream()创建一个可读文件流fs.createWriteStream()创建一个可写文件流net.connect()发起一个基于流的连接http.request()返回一个http.ClientRequest类的实例,它是一个可写流zlib.createGzip()使用gzip(一种压缩算法)将数据压缩成一个流zlib.createGunzip()解压缩gzip流。zlib.createDeflate()deflate(压缩算法)将数据压缩成流zlib.createInflate()解压一个deflate流流备忘单:查看更多:Node.js流量查找表下面是与可写流相关的一些重要事件:error–表示在编写或配置管道时发生错误。pipeline–当可读流被传递到可写流时,可写流发出此事件。unpipe–在可读流上调用unpipe并停止将其通过管道传输到目标流时发出。结论这就是有关流的全部基础知识。流、管道和链是Node.js的核心和最强大的特性。Streams确实可以帮助您编写简洁高效的代码来执行I/O。此外,还有一个名为BOB的预期Node.js战略计划,旨在改善Node.js的内部数据流,并希望公共API将作为未来的Node.js流数据接口。