当前位置: 首页 > 后端技术 > Node.js

Node.jsStream

时间:2023-04-03 17:21:38 Node.js

中Readable类的内部实现写在了开头。本次尝试分析探索Nodejs的Stream模块中Readable类的部分实现(可写流类似)。其中,readableflowmode中的pausedmode,即pausedmode,将用于解释源码上的实现。为什么不自然地分析flowing模式呢,因为这个模式是我们常用的,它的原理相对于paused模式来说比较简单(其实是因为作者总是喜欢关注一些角落和角落,并没有按套路出牌=.=),核心方法都是一样的,都是高手。感兴趣的童鞋可以自行查看完整源码。欢迎关注我的博客,不定时更新-生产者-消费者问题首先要明确Nodejs为什么要实现流,这就需要对生产者-消费者问题有一个清晰的概念。生产者消费者问题(英文:Producer-consumerproblem),又称有限缓冲区问题(英文:Bounded-bufferproblem),是多线程同步问题的一个经典案例。该问题描述了当两个线程共享一个固定大小的缓冲区(所谓的“生产者”和“消费者”)实际运行时会发生什么。生产者的主要作用是产生一定量的数据放入缓冲区,然后重复这个过程。与此同时,消费者也在消费缓冲区中的数据。这个问题的关键是保证生产者不会在缓冲区满的时候添加数据,消费者不会在缓冲区为空的时候消费数据。简单的说就是内存问题。与前端不同的是,后端对内存还是比较敏感的。比如读取一个文件,文件小的话没问题,但是如果文件是g呢?你读完了吗?这绝对不可取。宜以流的形式读一部分,写一部分慢慢来。PS:欢迎阅读为什么使用stream。实现一个可读流下面我们自己实现一个可读流,方便后续数据流转过程的观察:constReadable=require('stream').Readable;//实现一个可读流classSubReadableextendsReadable{构造函数(数据源,选项){超级(选项);this.dataSource=数据源;}//文档提出必须通过_read方法调用push来读取底层数据_read(){console.log('thresholdspecificationSize:',arguments['0']+'bytes')constdata=this.dataSource.makeData()letresult=this.push(data)if(data)console.log('添加数据大小:',data.toString().length+'bytes')console.log('缓存数据大小:',subReadable._readableState.length+'bytes')console.log('超过阈值限制或数据推送完成:',!result)console.log('=====================================')}}//模拟资源池constdataSource={data:newArray(1000000).fill('1'),//每次读取时推送一定量的数据makeData(){if(!dataSource.data.length)returnnull;返回数据源。data.splice(dataSource.data.length-5000).reduce((a,b)=>a+''+b)}//每次向缓存推送5000字节的数据};constsubReadable=newSubReadable(数据源);到目前为止可读e是我们实现的自定义可读流暂停模式。暂停模式有什么作用?先看一下整体流程:可读流会通过_read()从资源中读取数据到缓存池中,并设置一个阈值highWaterMark,将数据标记为缓存池大小的上限。这个阈值会浮动,最小值也是默认值16384。消费者监听到可读事件后,可以显式调用read()方法读取数据。通过注册可读事件触发暂停模式触发暂停模式:subReadable.on('readable',()=>{console.log('Cacheremainingdatasize:',subReadable._readableState.length+'byte')console.log('------------------------------------')})注册的时候可以找到可读事件之后,对流会将数据从底层资源推送到缓存中,直到超过阈值或加载完所有底层数据。开始消费数据并调用read(n);n=1000;先修改资源池大小数据:newArray(10000).fill('1')(用于打印数据),执行read(1000)每次读取1000字节资源读取资源:subReadable.on('readable',()=>{letchunk=subReadable.read(1000)if(chunk)console.log(`读取${chunk.length}字节数据`);console.log('缓存剩余数据大小:',subReadable._readableState.length+'byte')console.log('----------------------------------')})这样一来,读取的数据执行了两次,如果每次读取的字节都小于缓存中的数据,可读流就不会再从资源中加载新的数据。不带参数调用read()subReadable.on('readable',()=>{letchunk=subReadable.read()if(chunk)console.log(`读取${chunk.length}字节数据`);log('缓存剩余数据大小:',subReadable._readableState.length+'byte')console.log('-----------------------------------')})直接调用read()后,会逐步读取所有资源。总结以上,我们依次尝试了实现可读流后触发暂停模式时会发生什么情况。接下来笔者将探讨以下几个可能有疑问的点:为什么自己实现的可读流要实现_read()方法并在其中调用push()来触发暂停模式,缓冲池是怎么满的,为什么直接执行回调。不带参数调用read()和传入固定数据的区别就是为什么自实现的可读流需要实现_read()方法调用push()Readable.prototype._read=function(n){this.emit('错误',新的错误。错误('ERR_STREAM_READ_NOT_IMPLEMENTED'));};//只需定义接口Readable.prototype.read=function(n){...vardoRead=state.needReadable;如果(doRead){this._read(state.highWaterMark);}}当我们调用subReadable.read()时,就会执行上面的代码,可以发现在源码中只为_read()定义了一个接口,并没有具体的实现。如果我们不自己定义,就会报错。同时read()会执行它,通过调用push()从资源中读取数据,并传入highWaterMark。这个值你可以用也可以不用,因为_read()是我们自己实现的。Readable.prototype.push=function(chunk,encoding){...returnreadableAddChunk(this,chunk,encoding,false,skipChunkCheck);};从代码可以看出,将底层资源推送到缓存的核心操作是通过push,从语义上也可以看出,在push方法的最后会进行添加新数据的操作。由于后面的方法中有很多嵌套的方法,就不一一展示了。让我们看看最后一个调用的方法://readableAddChunk最终会调用addChunkfunctionaddChunk(stream,state,chunk,addToFront){...state.buffer.push(chunk);//数据被推送到缓冲区if(state.needReadable)//判断这个属性值,看是否触发可读事件emitReadable(stream);maybeReadMore(stream,state);//可能会向缓存中推送更多数据}我们可以看到,在方法调用的最后,将资源数据推送到了缓存中。同时会判断needReadable属性的值,看是否触发可读回调事件。而这也为我们分析为什么注册可读事件后会执行回调做铺垫。最后,调用maybeReadMore()是填充缓冲池的方法。暂停模式触发后缓冲池是如何满的首先我们看一下源码中如何绑定事件:Readable.prototype.on=function(ev,fn){if(ev==='data'){...}elseif(ev==='readable'){conststate=this._readableState;state.needReadable=true;//设置属性为true并触发可读回调...process.nextTick(nReadingNextTick,this);}};functionnReadingNextTick(self){self.read(0);}//执行后read(0)=>_read()=>push()=>addChunk()//=>maybeReadMore()maybeReadMore()当缓存池的存储大小小于阈值时,它会一直调用read(0)而不读取数据,但会一直将底层资源推送到缓存中:functionmaybeReadMore_(stream,state){...if(state.lengthstate.highWaterMark)state.highWaterMark=computeNewHighWaterMark(n);如果(n<=state.length)返回n;//Don'thaveenoughif(!state.ended){//传输未结束为falsestate.needReadable=true;返回0;}returnstate.length;}直接调用read()时,n参数为NaN。流模式时,n为bufferheader数据的长度,否则为整个buffer数据长度。如果给read(n)传入一个数字,如果大于当前的hwm,可以发现会重新计算一个hwm。同时,如果缓存的数据小于请求的数据量,则设置state.needReadable=true;并返回0;总结一下第一次尝试整理源码的思路,一路写下来,发现有很多想说的却不知道怎么连贯地整理出来=。=由于代码细节有些不清楚,不过最终还是提炼了一个核心思路:核心方法:Readable.prototype.read()Readable.prototype.push();emitReadable()可以在push中执行;核心属性:this.needReadable:使用该属性决定是否触发回调核心思想:将数据推送到缓存和读取缓存数据的操作由read()控制(因为read内部同时实现了push和howMuchToread(),区别是前者是read(0),即只push不读;后者是read()或read(n);注册readable事件后,read(0)资源会被push到缓存中直到达到highWaterMark,才会触发回调函数,如果执行read(),相当于每次都把缓存中的数据全部取出来,如果任何时候缓存中没有数据,就可以只继续从资源中获取,直到所有数据都取出来读取,如果执行read(n)(n