当前位置: 首页 > 后端技术 > Node.js

节点流(上篇)——可读性

时间:2023-04-03 15:31:46 Node.js

题外话本文整合了网上多篇文章(整合点已设置超链接,点击可直接了解原文),仅供分享给大家,并且更容易理解了解每个流程的初始流程。本人也是node新手,如有描述错误或理解错误,请指出。可读性先来安利一下思路,理清逻辑:)。可读缓冲区:这里的read是一个形容词,指的是可读流临时存放数据的缓冲区(只能是字符串或Buffer,不能是数字)。(读缓冲区就像水电站,感觉这样描述比较好理解flowing和paused模式。)Flowingmode:即流水模式,就像打开水电站的闸门一样。上游水和下游完全连接,直到上游源的数据用完。Pausedmode:暂停模式,就像水电站的闸门只有你指定才会打开(使用stream.read())。然而,当你用read()打开水闸时,却是灵异现象——水电站的水瞬间被排干,上游的水还没来得及填满水电站。然后自动关闭闸门,等待你的下一个“赞助人”read()。_read:上游水源通过_read中的push和unshift方式流入水电站。Event查看原文readable:当一个数据块可以从流中读取时发出。其对应的处理器没有参数,可以在处理器中调用read([size])方法读取数据。data:当有数据可读时发出。它对应的处理器有一个代表数据的参数。如果你只是想快速读取一个流的数据,将处理器与数据相关联是最方便的方式。处理器的参数是一个Buffer对象。如果调用Readable的setEncoding(encoding)方法,处理器的参数是一个String对象。end:读取数据时发出。相应的处理程序没有参数。close:当底层资源(例如文件)关闭时发出。并非所有可读流都会发出此事件。相应的处理程序没有参数。error:当接收到的数据发生错误时发出。对应的processor参数是Error的一个实例,其message属性描述了错误的原因,stack属性保存了错误发生时的堆栈信息。Function查看原文read([size]):如果给read方法传入一个size作为参数,则返回指定的数据量,如果数据不足,则返回null。如果不给read方法传递参数,它会返回内部缓冲区中的所有数据。如果没有数据,则返回null,这可能表明遇到了文件末尾。read返回的数据可能是Buffer对象,也可能是String对象。setEncoding(encoding):为流设置一种编码格式,对读取的数据进行解码。调用此方法后,read([size])方法返回一个String对象。pause():暂停可读流,不再发出数据事件resume():恢复可读流,继续发出数据事件pipe(destination,[options]):将这个可读流的输出传递给由指定的可写流destination,两个流形成一个管道。options是一个JS对象。此对象具有默认值为true的布尔结束属性。当end为true时,Writable会在Readable结束时自动终止。注意,我们可以将一个Readable与多个Writable连接起来,形成多个pipeline,每个Writable可以获得相同的数据。此方法返回目的地。如果destination本身是一个Readablestream,则可以级联调用pipe(例如,我们使用gzip压缩和解压时会出现这种情况,稍后会提到)。unpipe([destination]):端口和指定目的地之间的管道。当没有传递目的地时,断开连接到此可读流的所有管道。流模式和暂停模式的切换查看原始流程从默认的暂停模式切换到流模式可以通过以下方式:通过添加数据事件监听来启动数据监听调用resume()方法启动数据流调用pipe()方法将数据传输到另一个可写流从流模式切换到暂停模式有两种方法:当流没有pipe()时,调用pause()方法暂停流。pipe()时需要移除所有数据事件监听,然后调用unpipe()方法触发准备数据(_read)方法datalistenerreadablelistenerread()——如果当前缓冲区为空,或者缓冲区没有不超过我们设定的最大值,就可以继续准备数据;如果此时正在准备数据(_read())或者读取已经完成(push(null)),则放弃数据准备。Workflow查看原文这里我要比较几句哈哈。以下言论来自大神写的原文,很细腻,也包含源码解读,但不适合刚接触流量的同志,澄清后更适合整个思路~paused模式:1.在paused模式下,读取所有buffer的长度;如果读取的字节数(n)大于bufferset的最大值,则适当扩大buffer的大小(默认16k,最大8m);如果读取的长度大于当前缓冲区的大小,则设置needReadable属性并为下一次读取准备数据。2、如果当前buffer为空,或者buffer没有超过我们设置的最大值,那么就可以继续准备数据;如果你正在准备数据(_read())或者读完(push(null)),那么就放弃准备数据。3.对于_read这个私有方法,文档中有专门的说明。自定义的Readable实现类需要实现该方法。在该方法中,手动向Readable对象的读取缓冲区中添加数据,然后读取Readable。可以理解为_read函数是读取数据前的准备工作(datapreparation),针对流的实现者。Flowing模式:1.对于flowing模式下的读取,每个只读缓冲区中第一个buffer的长度2.对于_read这个私有方法,文档中有专门的说明,自定义的Readable实现类需要实现这个方法,在该方法中手动向Readable对象的读取缓冲区中添加数据,然后读取Readable。可以理解为_read函数是读取数据前的准备工作(datapreparation),针对流的实现者。示例暂停模式://这是一个将存储多个json字符串的txt文件读入json的示例conststream=require('stream');constfs=require('fs');constutil=require('util');functionJSONLineReader(source){stream.Readable.call(this);this._source=来源;this._foundLineEnd=false;这个._buffer='';source.on('readable',function(){//监听当source准备好了,然后我们可以使用read()或者readable监听器来触发JSONLineReader的_read方法this.read();//this.on('readable',function(data){//console.log('readable');//});}.bind(this))}util.inherits(JSOLLineReader,stream.Readable);JSOLLineReader.prototype._read=function(大小){var块;可变线;变种行索引;变量结果;如果(this._buffer.length===0){chunk=this._source.read();this._buffer+=块;//取一次,看看什么时候推送null}lineIndex=this._buffer.indexOf('\n');如果(lineIndex!==-1){line=this._buffer.slice(0,lineIndex);如果(线){结果=JSON.parse(行);this._buffer=this._buffer.slice(lineIndex+1);this.emit('object',result);util.inspect(result))this.push(util.inspect(result));}else{this._buffer=this._buffer.slice(1);}}}letinput=fs.createReadStream(__dirname+'/json-lines.txt',{encoding:'utf8'});varjsonLineReader=newJSONLineReader(input);jsonLineReader.on('object',function(obj){console.log('pos:',obj);})/*json-lines.txt{"success":false,"code":501}{"success":true,"code":202}{“成功”:假,“代码”:503}{“成功”:真,“代码”:204}{“成功”:假,“代码”:505}{“成功”:真,“代码”:206}{"success":false,"code":507}{"success":true,"code":208}{"success":false,"code":509}*/flowing模型:letstream=require('stream');letutil=require('util');util.inherits(flowingReadableDemo,stream.Readable);functionflowingReadableDemo(opt){stream.Readable.call(this,opt);这个。报价=[“yessdasdsa”,“noasdasdas”,“也许”];this._index=0;}flowingReadableDemo.prototype._read=function(){if(this._index>=this.quotes.length){this.push(null);}else{this.push(this.quotes[this._index]);这个._index+=1;}};letr=newflowingReadableDemo();r.on('data',function(data){console.log("Callbackread:"+data.toString());//在流动状态下,我们不不需要执行read,只需要设置数据事件处理程序或者设置导流目标管道即可});r.on('end',function(data){console.log("Nomoreanswers.");});