当前位置: 首页 > 后端技术 > Node.js

初探stream

时间:2023-04-03 17:30:34 Node.js

Node中的Stream有以下四种类型:Readable——可读操作Writable——可写操作Duplex——可读可写操作Transform——操作写入数据,然后读取结果Readablestream(可读流)可读流接口是对您正在阅读的数据源的抽象。换句话说,来自Readable流的数据将不会分发数据,直到您指示它已准备就绪。可读流有两种模式:流动模式和暂停模式。在流动模式下,数据会尽快从底层系统读取并提供给您的程序。在暂停模式下,您必须显式调用stream.read()来读取数据。暂停模式是默认模式。有几种方法可以将流切换为流动模式。letfs=require('fs');/***所有初始工作模式为paused的Readablestreams可以通过以下三种方式切换到flowing模式:监听'data'事件并调用stream.resume()方法调用流。pipe()方法将数据发送到Writable*/letrs=fs.createReadStream('./1.txt',{highWaterMark:3});/*269stream.emit('data',chunk);stream.read(0);rs.on('data',function(data){console.log(data);});rs.on('end',function(){console.log('end');});*///当你监听到可读事件时,会进入暂停模式读取缓冲区中的文件conststate=this._readableState;//self.read(0);只填充缓存,但不发出数据事件,但发出stream.emit('readable');事件//this._read(state.highWaterMark);每次调用底层方法读取,读取3个字节3console.log(rs._readableState.length);//如果不加参数则读取,表示读取整个buffer数据//读取一个字段,如果可读流发现要读取的字节小于或等于缓存字节大小,则直接返回letch=rs.read(1);console.log(ch);console.log(rs._readableState.length);/*ch=rs.read(1);console.log(ch);console.log(rs._readableState.length);*///读完指定的字节后,如果可读流发现剩余字节小于最大水位线,会立即重新读取以填充最大水位线setTimeout(function(){console.log(rs._readableState.length);},200)});可写流(Writablestream)该方法将数据写入底层系统并调用给定的回调。返回值指示您是否应立即继续写入。如果要在内部缓存数据,则返回false。否则返回真。返回值仅供参考。即使返回false,你也可以继续写。但是写入缓存在内存中,所以不要过度。最好的方法是在写入数据之前等待drain事件。letfs=require('fs');letws=fs.createWriteStream('2.txt',{flags:'w',mode:0o666,start:0,highWaterMark:3});letcount=9;函数write(){letflag=true;//缓冲区未满//写入方式是同步的,但是写入文件的过程是异步的。我们的回调函数将在实际写入文件后执行while(flag&&count>0){console.log('before',count);flag=ws.write((count)+'','utf8',(function(i){return()=>console.log('after',i);})(count));数数-;}}write();//987//监听缓冲区清除事件ws.on('drain',function(){console.log('drain');write();//654321});ws.on('error',function(err){console.log(err);});//如果不再需要写入,可以调用end方法关闭写入流。一旦调用了end方法,就不能再写入ws.end();//writeafterend//ws.write('x');双工流(Duplexstreams)同时实现了Readable和Writable接口。用法见下文let{Duplex}=require('stream');letindex=0;lets=Duplex({read(){if(index++<3)this.push('a');elsethis.push(null);},write(chunk,encoding,cb){console.log(chunk.toString().toUpperCase());cb();}});//process.stdin标准输入流//过程。stdout标准输出流process.stdin.pipe(s).pipe(process.stdout);转换流(Transformstreams),其输出是根据输入计算的。它实现了Readable和Writable接口。有关使用详细信息,请参见下文。let{Transform}=require('stream');//transformationstream是实现数据转换lett=Transform({transform(chunk,encoding,cb){this.push(chunk.toString().toUpperCase());cb();}});process.stdin.pipe(t).pipe(process.stdout);让{Transform}=require('stream');letfs=require('fs');letrs=fs.createReadStream('./user.json');//Buffer放在普通流中,对象放在对象流中lettoJSON=Transform({readableObjectMode:true,//可以将对象放入可读流中transform(chunk,encoding,cb){//将this.push(JSON.parse(chunk.toString()))放入可读流中的缓冲区;}});letoutJSON=Transform({writableObjectMode:true,//可以将对象放入可读流transform(chunk,encoding,cb){console.log(chunk);cb();}});rs.pipe(toJSON).pipe(outJSON);