当前位置: 首页 > 科技观察

彻底掌握 Node.js 四大流,解决爆缓冲区的“背压”问题

时间:2023-03-15 21:05:20 科技观察

彻底掌握Node.js的四大流,解决突发缓冲区的“背压”问题。如果这东西重一吨怎么办?然后一块一块地移动。其实IO就是搬东西,包括网络IO和文件IO。如果数据量不大,那么可以直接传输所有内容。可以一部分一部分的处理,这就是流的思想。所有语言基本都实现了streamapi,Node.js也是,streamapi也是比较常用的。让我们探索下面的流。本文将回答以下问题:Node.js的4个流是什么?生成器如何与可读流结合?什么是背压问题?Node.js的4个流如何解决直观感觉Stream从一个地方到另一个地方,明显有流出的一面和流入的一面。流出端是可读流(readable),流入端是可写流(writable)。当然,也有既能流入又能流出的溪流。这称为双工流。既然可以流入流出,那么流入的内容能不能降频再流出呢?这个流叫做转换流(transform的流入和流出内容)双工流不需要关联,但是transform流的流入和流出内容是关联的,这是两者的区别。流apiNode.js提供的流就是上面介绍的四种:conststream=require('stream');//可读流constReadable=stream.Readable;//可写流constWritable=stream.Writable;//双工作流constDuplex=stream.Duplex;//转换流constTransform=stream.Transform;它们都有要实现的方法:Readable需要实现_read方法来返回内容Writable需要实现_write方法来接受内容Duplex需要实现_read和_write方法来接受和返回内容。Transform需要实现_transform方法,对接收到的内容进行转换并返回。我们单独来看一下:ReadableReadable需要实现_read方法,通过push返回具体的数据。constStream=require('stream');constreadableStream=Stream.Readable();readableStream._read=function(){this.push('阿门前的一棵藤,');this.push('阿东阿东绿色的刚刚发芽,');this.push('阿东背着那沉重的壳,');this.push('一步步往上爬。')this.push(null);}readableStream.on('data',(data)=>{console.log(data.toString())});readableStream.on('end',()=>{console.log('done~');});推送空值时,表示流结束。执行效果如下:创建一个Readable也可以通过继承来实现:constStream=require('stream');classReadableDongextendsStream.Readable{constructor(){super();}_read(){this.push('AmenA以前的葡萄树,');this.push('阿东阿东绿刚发芽,');this.push('阿东背着那沉重的壳,');this.push('一步步往上爬。')this.push(null);}}constreadableStream=newReadableDong();readableStream.on('data',(data)=>{console.log(data.toString())});readableStream.on('end',()=>{console.log('完成~');});可读流生成内容,所以很自然地结合生成器:constStream=require('stream');classReadableDongextendsStream.Readable{constructor(iterator){super();this.iterator=iterator;}_read(){constnext=this.iterator.next();if(next.done){returnthis.push(null);}else{this.push(next.value)}}}function*songGenerator(){yield'阿门,你面前的一棵藤蔓,';yield'阿东,阿东的绿刚发芽,';yield'阿东背着那么重的shell,';yield'一步步往上爬。';}constsongIterator=songGenerator();constreadableStream=newReadableDong(songIterator);readableStream.on('data',(data)=>{console.log(data.toString())});readableStream.on('end',()=>{console.log('完成~');});这是一个可读流,它通过实现_read方法返回内容。WritableWritable实现了_write方法来接收写入的内容。constStream=require('stream');constwritableStream=Stream.Writable();writableStream._write=function(data,enc,next){console.log(data.toString());//每秒写入一次setTimeout(()=>{next();},1000);}writableStream.on('finish',()=>console.log('done~'));writableStream.write('AmenAvinebefore,');writableStream.write('阿东阿东绿刚发芽,');writableStream.write('阿东背着那沉重的壳,');writableStream.write('一步步往上爬。');writableStream.end();接收写入的内容,打印出来,调用next处理下一个写入的内容。这里调用next是异步的,可以控制频率。运行一段时间后,写入的内容确实可以正常处理:这就是可写流,通过实现_write方法来处理写入的内容。DuplexDuplex是可读可写的,同时实现_read和_write就可以了阿门前一颗葡萄藤,');this.push('阿东阿东绿刚发芽,');this.push('阿东背着那沉重的壳,');this.push('一步一步一步往上爬。')this.push(null);}duplexStream._write=function(data,enc,next){console.log(data.toString());next();}duplexStream.on('data',data=>console.log(data.toString()));duplexStream.on('end',data=>console.log('readdone~'));duplexStream.write('AmenAformertreeGrapevine,');duplexStream.write('阿东阿东绿色刚发芽,');duplexStream.write('阿东背着那沉重的壳,');duplexStream.write('一步步往上爬。');duplexStream.end();duplexStream.on('完成',data=>console.log('writtenone~'));集成了Readablestream和Writablestream的功能,这就是双工流Duplex。TransformDuplex流虽然是可读可写的,但是两者没有关系,有时需要对传入的内容进行转换再流出。这时候就需要转换流Transform了。Transform流需要实现_transformapi,我们实现反转内容的转换流:constStream=require('stream');classTransformReverseextendsStream.Transform{constructor(){super()}_transform(buf,enc,next){constres=buf.toString().split('').reverse().join('');this.push(res)next()}}vartransformStream=newTransformReverse();transformStream.on('数据',数据=>console.log(data.toString()))transformStream.on('end',data=>console.log('readdone~'));transformStream.write('阿门前的藤蔓');transformStream.write('阿东阿东绿色刚发芽');transformStream.write('阿东背着那沉重的壳');transformStream.write('一步步往上爬');transformStream.end()transformStream.on('完成',data=>console.log('writtenone~'));运行一段时间后,效果如下:流的暂停和流动我们从Readable流中获取内容,然后流入Writable流中,两边的_read和_write的实现分别实现了流动。背压,但是读写都是异步的。如果两者的利率不一致怎么办?如果Readable读取数据的速率大于Writable写入的速率,则缓冲区中会累积一些数据。如果缓冲的数据太多,它会爆炸,数据会丢失。如果Readable读取数据的速率低于Writable写入的速率怎么办?那也罢了,顶多中间有个空闲期。这种读取速率大于写入速率的现象称为“背压”,或“负压”。也很容易理解,写段压力比较大,写不进去,就会爆缓冲区,导致数据丢失。缓冲区大小可以通过readableHighWaterMark和writableHighWaterMark查看,为16k。解决背压时读写速度不一致的问题怎么解决?写不完,就暂停阅读。这样,越来越多的数据不会被读入,驻留在缓冲区中。可读流有一个readableFlowing属性,表示是否自动读入数据。默认为true,即自动读入数据,然后监听数据事件获取。当readableFlowing设置为false时,不会自动读取,需要通过read手动读取。readableStream.readableFlowing=false;letdata;while((data=readableStream.read())!=null){console.log(data.toString());}但是手动读取比较麻烦,我们还是可以使用自动流的方式,只需调用pause和resume即可暂停和恢复。当调用可写流的write方法时,会返回一个boolean值,表示target是写入还是放入buffer:true:数据已经写入targetfalse:target不能写入,暂时放置在buffer中判断为false时返回Pause,buffer清空后resume:constrs=fs.createReadStream(src);constws=fs.createWriteStream(dst);rs.on('data',function(chunk){if(ws.write(chunk)===false){rs.pause();}});rs.on('end',function(){ws.end();});ws.on('drain',function(){rs.resume();});这样就可以实现根据写入速率暂停和恢复读取速率的功能,解决了背压问题。管道是否存在背压问题?通常情况下,我们经常使用管道直接将Readable流连接到Writable流,但似乎从未遇到过背压问题。实际上,在管道内部已经做了读取速率的动态调整。constrs=fs.createReadStream(src);constws=fs.createWriteStream(dst);rs.pipe(ws);摘要流是传输数据时常用的概念,是传输内容的一部分,是文件读写和网络通信的基本概念。Node.js还提供了流API,包括Readable可读流、Writable可写流、Duplex双工流、Transform转换流。它们分别实现_read、_write、_read+_write、_transform方法返回和处理数据。创建Readable对象,可以直接调用Readable的api创建,然后重写_read方法,也可以继承Readable实现一个子类,然后实例化。其他流也是如此。(Readable可以很方便的和generator结合)当读速率大于写速率时,会出现“背压”现象,会爆缓冲区,造成数据丢失。解决方法是根据写入速率动态暂停写入。恢复可读流率。pipe没有这个问题,因为是内部处理的。流是掌握IO绕不开的概念,背压也是流很常见的问题。如果遇到数据丢失,可以考虑是否发生了背压。希望本文能帮助大家理清思路,真正掌握流!