当前位置: 首页 > 后端技术 > Node.js

nodejs篇-Stream

时间:2023-04-03 20:43:23 Node.js

什么是流?流是一种抽象的数据结构。想象一下水流,当在水管中流动时nodejsstream接口:Stream有四种流类型:Readable——可读操作。可写-可写操作。Duplex-读写操作。Transform-操作写入数据,然后读出结果。所有Stream对象都是EventEmitter的实例。常用的事件有:data——有数据读取时触发。end-当没有更多数据可读时触发。error-在接收和写入过程中发生错误时触发。finish-当所有数据都已写入底层系统时触发。可读流createReadStream可读流是产生数据供程序消费的流。常见的数据生产方式有读取磁盘文件、读取网络请求内容等。constfs=require("fs");constpath=require("path");letrs=fs.createReadStream(path.resolve(__dirname,"./read.txt"))//保存读取的数据constdata=[];//读数据rs.on("data",(chunk)=>data.push(chunk));//读完rs.on("end",()=>console.log(Buffer.concat(data).toString()))//错误rs.on("error",(err)=>console.log(`${err.path}文件路径错误或损坏`));可写流createWriteStream可写流是对数据流向设备的抽象,通常将可读流读取的数据通过可写流写入。constfs=require("fs");constpath=require("path");letws=fs.createWriteStream(path.resolve(__dirname,"./write.txt"))ws.write("使用流来write输入文本数据...\n");ws.write(Buffer.from("使用Stream写入二进制数据...\n"))ws.write("END.");ws.end();ws.on("完成",()=>console.log("写入完成"));PipelineStream管道提供了一种输出流到输入流的机制。通常我们习惯从一个流中获取数据,然后将数据传递给另一个流。varfs=require("fs");//创建一个可读流varreaderStream=fs.createReadStream('input.txt');//创建一个可写流varwriterStream=fs.createWriteStream('output.txt');//管道读写操作//读取input.txt文件的内容,并将内容写入output.txt文件readerStream.pipe(writerStream);使用管道流优化大文件在网络中的传输,通过读取Filetocontent,完成后再响应客户端。大文件消耗内存并等待太久。constfs=require('fs');constserver=require('http').createServer();服务器。on('request',(req,res)=>{fs.readFile('./big.file',(err,data)=>{if(err)throwerr;res.end(data);});});服务器.listen(8000);利用http请求中request和response中可读可写流的特性,通过管道传输constfs=require('fs');constserver=require('http').createServer();server.on('request',(req,res)=>{constsrc=fs.createReadStream('./big.file');src.pipe(res);});server.listen(8000);读流、写流、双工流所有可以读数据的流都继承自stream.Readable,所有可以写的流都继承自stream.Writable。Readableconst{Readable}=require("stream")letarr=["a","b","c","d"];classMyReadStreamextendsReadable{_read(){if(arr.length)返回这个。push(arr.splice(0,1)[0])//returnnull表示读取returnthis.push(null)}}letrs=newMyReadStream()letdata=[]rs.on("data",(chunk)=>data.push(chunk))rs.on("end",()=>{letresult=Buffer.concat(data).toString()console.log(result)})Writableconstfs=require("fs")const{resolve}=require("path")const{Writable}=require("stream")classMyWriteStreamextendsWritable{_write(chunk,encoding,callback){fs.appendFile(resolve(__dirname,"./write.txt"),chunk,()=>{//每3秒写入一次//callback相当于clearBuffer()清除缓存的读取数据setTimeout(callback,3000)})}}letws=newMyWriteStream()ws.write("New")ws.write("Add")ws.write("Add")ws.write("of")ws.write("Content")DuplexDuplexStreamDuplexStream都可以读并写const{Duplex}=require("stream")classMyDuplexextendsDuplex{_read(){console.log(“读取”)}_write(块,编码,回调){console.log(块)回调()}}letduplex=newMyDuplex()duplex.on(“数据”,(块)=>console.log("ondata"))duplex.write("1")duplex.write("2")Transform_transform可以拦截写入的流内容const{Transform}=require("stream")classMyTransformextendsTransform{_transform(chunk,encoding,回调){this.push("测试");//多次推送内容this.push(chunk);//this.push(null);打回来();}}letmyTransform=newMyTransform();myTransform.on("data",(chunk)=>{console.log(chunk.toString());})myTransform.write("这是写的内容");修改输入控制台的输出内容:const{Transform}=require("stream")classMyTransformextendsTransform{_transform(chunk,encoding,callback){//大小写转换为输入内容this.push(chunk.toString().toUpperCase());this.push("\n\r")callback()}}letmyTransform=newMyTransform();process.stdin.pipe(myTransform).pipe(process.stdout);参考链接Node.jsStreams:你需要知道的一切Node.jsStream(流)廖雪峰JavaScript教程