当前位置: 首页 > 后端技术 > Node.js

流的分析与实现

时间:2023-04-03 14:39:40 Node.js

流的定义流是一个抽象的概念,形象地描述了数据的流动和变化。具体来说,在node中,stream是一个处理数据的抽象接口,它继承了EventEmitter。通过这个接口,我们可以控制流的切换,流向等等。更加直观直观,类似于我们在linux上使用shell通过管道和链接来处理各个部分。下面是我写的一个命令,用来过滤版本并导出到文件中。流的分类Readable(可读流)Writable(可写流)Duplex(可读可写流)Transform(读写过程中可以修改和改变的Duplex流)流按功能大致分为以上四类。应用的场景很多,如下图所示(来源:参考链接2)。下面,我将根据流的分类列出一些演示应用示例。Readable可读流可以接受各种数据源,例如控制台输入、文件和字符串。等等,介绍里说了,它是一个抽象接口,可以面向各种形式的输入。这里有一些例子。文件流require('fs').createReadStream('./1.txt',{encoding:'utf8'}).on('data',(data)=>{console.log(data)})//输出hellojsdt来解释为什么要用流来读取。直接使用fs.readFile不是更方便吗,因为readFile是一个全局操作,会把所有的文件读入内存进行处理,所以如果文件很大,程序会很卡,甚至会报错。标准输入流process.stdin.setEncoding('utf8');process.stdin.on('data',(data)=>{console.log('output:'+data)})节点运行代码,然后输入hellojsdt输出:hellojsdt说明做acm的时候会用到这个,或者自己写一些交互应用时的普通数据流。let{Readable}=require('stream')letutil=require('util')classTestextendsReadable{constructor(){super()this.dataSource=5}_read(){if(this.dataSource-->0){this.push(this.dataSource+'');}else{this.push(null);}}}letcounter=newTest();counter.on('data',function(data){console.log(data.toString())});Output:4321表示重写_read方法,因为定义了输入逻辑。在上面的例子中,它是在自己的逻辑中生成的数据源。可写文件流letdataSource='hellojsdt',i=0;(function(){letws=require('fs').createWriteStream('./1.txt',{encoding:'utf8'})letflag=true;while(flag&&i{})}process.nextTick(function(){console.log(arr.toString())})//输出1、2、3表示上面重写了流的write方法,可以自定义写入逻辑Duplexrequire('net').createServer(socket=>{socket.on('data',data=>{console.log('clientmessage'+data);socket.write("servermessage"+'helloclient');})}).listen(8080,()=>{})表示socket是一个可写流,可以向客户端发送信息,作为可读流,可以监听数据事件,接收来自客户端的信息。Transformlett=require('stream').Transform({transform(chunk,encoding,cb){this.push(chunk.toString().toUpperCase());cb();}});process.stdin.pipe(t).pipe(process.stdout);//输入abc//输出abc来说明上面使用的转换流,在终端上实现小写输入,对应大写输出函数流数据分类binarymodeobjectmode可以指定创建流时的配置,objectMode默认为false,设置为true切换到对象模式。Binary是buffer模式,可读或可写流会将数据缓存在buffer中。流量分析通过上面的介绍,我们已经明确了流量的定义,并且按照功能对流量进行了分类。接下来,我来分析一下。一般来说,流的各种形式之间的转换和传输的底层是二元的,具体到使用形式。有buffer、string等。首先,让我们详细谈谈可读流。可读流有两种模式,默认是暂停模式。Flowing根据初始化配置自动读取数据,并通过观察者模式直接将数据提供给订阅者。Paused显式调用流的read方法读取数据。如果我们想切换到流模式,我们可以监听数据事件。或者调用stream.resume(),stream.pipe()方法。可读流源码分析//可读流入口,根据配置返回一个可读流fs.createReadStream=function(path,options){returnnewReadStream(path,options);};//实现原理为ReadStream.prototype.__proto__=Readable.prototype,可以继承Readable上的一些方法util.inherits(ReadStream,Readable);fs.ReadStream=ReadStream;functionReadStream(path,options){//非新的调用方式,直接返回一个实例if(!(thisinstanceofReadStream))returnnewReadStream(path,options);options=copyObject(getOptions(options,{}));if(options.highWaterMark===undefined)//highWaterMark默认值为64k,设置流模式下缓冲区的大小options.highWaterMark=64*1024;Readable.call(这个,选项);handleError((this.path=getPathFromURL(path)));//文件描述符,根据这个句柄找到文件this.fd=options.fd===undefined?空:选项.fd;//打开文件要做的标志,默认是'r'this.flags=options.flags===undefined?'r':选项。标志;//用于设置文件模式(权限和粘滞位),仅在创建文件时使用。this.mode=options.mode===未定义?0o666:选项.模式;//开始读取位置this.start=options.start;//阅读结束位置(!!!包括结束位置)this.end=options.end;/***如果autoClose为false,文件描述符将不会关闭,即使有错误。*要求程序负责关闭它并确保没有文件描述符泄漏。*如果autoClose设置为true(默认),文件描述符将在错误或结束时自动关闭*/this.autoClose=options.autoClose===undefined?真:选项。自动关闭;this.pos=this.start;}//适合传入句柄,比如fd:0,这样就不是文件,而是控制台输入的数据if(typeofthis.fd!=='number')this.open();this.on('end',function(){if(this.autoClose){this.destroy();}});}//打开文件并触发open事件。只有打开了才能读取,所以在回调中触发open事件,看下一步ReadStream.prototype.open=function(){varself=this;fs.open(this.path,this.flags,this.mode,function(er,fd){self.fd=fd;self.emit('open',fd);//开始数据流.self.read();});};Readable.prototype.read=function(n){//read(0)时,如果缓存中已经有数据,则触发readable事件,相当于刷新缓存.否则触发结束事件if(n===0&&state.needReadable&&(state.length>=state.highWaterMark||state.ended)){if(state.length===0&&state.ended)endReadable(this);否则emitReadable(this);返回空值;}//如果可读流已经通过终止符(null)并且缓冲区中没有剩余数据,则结束可读流if(n===0&&state.ended){if(state.length===0)endReadable(这个);返回空值;}//如果当前缓冲区的数据大小为空,或者没有超过设置的警告线,则进行一次数据读取。如果(state.length===0||state.length-nstate.highWaterMark)state.highWaterMark=computeNewHighWaterMark(n);如果(n<=state.length)返回n;}//经过上面的一系列准备,真正的读操作才会开始fs.read(this.fd,pool,pool.used,toRead,this.pos,(er,bytesRead)=>{if(bytesRead>0){this.bytesRead+=bytesRead;}this.push(b);});};//上面的整个过程就是暂停的进程,这里的流模式不同,如下图//如果监听到数据事件,会调用this.resume()启动流模式Readable.prototype.on=function(ev,fn){constres=Stream.prototype.on.call(this,ev,fn);if(ev==='data'){//如果流没有明确暂停,则在下一个刻度开始流动if(this._readableState.flowing!==false)this.resume();}}//流模式下,流内部自动触发数据事件,循环读取数据functionflow(stream){conststate=stream._readableState;调试('流',state.flowing);while(state.flowing&&stream.read()!==null);}//然后触发数据事件,循环发射数据stream.emit('data',chunk);小结以上就是可读流的源码分析,小结下面整理一下重点部分。通过ReadStream创建流时,会默认触发可读事件,进入暂停模式。这个时候内部维护了一个buffer。读取操作在可读事件回调逻辑中执行。使用howMuchToRead方法计算实际读取次数。如果现有数据小于highWaterMark,内部会执行this._read(state.highWaterMark)操作,回调中会执行push操作。Push调用readableAddChunk将数据放入内部维护的缓存中,否则从fromList中读取缓存中的数据,然后返回而如果监听到数据事件,则会调用this.resume(),如代码所示,将flow状态设置为flowing模式,然后resume()->resume_()->flow的调用顺序()会循环执行flow方法读取Data,触发data事件,完成数据的自动读取,然后发送给调用者,整个过程不断循环。上面的比较值是flow模式和paused模式的区别。如果流模式在addChunk,函数addChunk(stream,state,chunk,addToFront){if(state.flowing&&state.length===0&&!state.sync){stream.emit('data',chunk);stream.read(0);}}会自动发出数据,不经过缓存,暂停模式会经过内部缓存机制。根据上面node源码的分析过程,下面图解说明整个过程。一个自己实现的可读流和可写流源码分析//1:首先第一步根据createWriteStream传入的参数进行初始化//2:调用写操作Writable.prototype.write=function(chunk,encoding,cb){if(state.ended)//写到最后会发出错误事件writeAfterEnd(this,cb);elseif(validChunk(this,state,chunk,cb)){//在验证数据chunk有效的情况下才会进行后续的写逻辑state.pendingcb++;ret=writeOrBuffer(this,state,chunk,encoding,cb);}returnret;};functionwriteOrBuffer(stream,state,chunk,encoding,cb){chunk=decodeChunk(state,chunk,encoding);if(chunkinstanceofBuffer)encoding='buffer';varlen=state.objectMode?1:块.长度;state.length+=len;//实时更新缓冲区长度varret=state.lengthclearBufferuntil直到数据缓存中的数据耗尽。清洗后,后续调用write的返回值ret为false,从而继续写入,循环上述整个过程,直到写入数据源。总的来说,因为可写流内部只有一种状态,所以复杂度比可读流要低,整个过程也比较清晰,不是图形化的流程。一个基于v8.9.4参考http://nodejs.cn/api/https://medium.freecodecamp.o...