当前位置: 首页 > 后端技术 > Node.js

Node预备篇(五)---streams模块

时间:2023-04-03 21:16:37 Node.js

stream在HTTP模块章节的静态文件服务器中,我们看到了两个可写流的例子,即服务器可以向响应对象写入数据,也可以向Write请求返回的请求对象中的数据。可写流是节点接口中广泛使用的一个概念。所有可写流都有一个用于传递字符串或缓冲区对象的写入方法;还有一个结束流的方法。如果给定一个参数,end将在流关闭之前输出指定的一段数据。两种方法都可以接受回调函数作为参数。但是我们搭建的静态文件服务器有个问题,就是我们直接用readfire方法读取文件夹里的文件,不管文件大小,小一点的文件还好,但是如果文件是一个几个G大小的电影,占用的内存空间需要几个G,直到文件完全写入网络另一端才能释放内存,这对于我们的电脑来说是无法承受的。其实,对于文件,我们不需要一次完成文件传输。我们可以一点点读写,一点一点完成文件传输。当然,不能马上读一点,写一点。传输速度可能不同。比如文件读的快,写的慢。计算机可以读取一定量的数据,写入内存中,以运行其他任务,这就需要一个缓冲区来提高运行效率。stream的目的:控制内存使用(控制内存使用不超过watermark),将大文件分解成小块,然后一点一点的传输,减少内存压力,协调不同阶段的处理速度不同流的分类:可读流可写流双工流转换流例如:我们要用通常的方式传输一部电影,也许我们会这样写:constfs=require('fs')varfile='movie.mp4'fs.readFile(file,(err,data)=>{if(err){console.log(err)}else{fs.writeFile('movie-1',data,()=>{console.log('done')})}})很简单,就是先读再写,这个过程是一次性完成的,也就是说如果电影的大小是1G,内存占用如果在流中写入,则至少为一个G:constfs=require('fs')varfile='D:/Users/movie.mp4'varrs=fs.createReadStream(file)varws=fs.createWriteStream('D:/Users/movie-1.mp4',{highWaterMark:65536})//这里的highWaterMark可以指定缓冲内存大小,默认大小为65536字节,即64krs.on('data',data=>{if(ws.write(data)===false){//这里的write函数会返回一个布尔值,false表示buffer内存已满,无法继续写入rs.pause()//如果内存已满,rs暂停}})ws.on('drain',()=>{//ws内存耗尽rs.resume()//rs恢复执行})rs.on('end',()=>{ws.end()})如果读取速度快于写入速度且缓存已满,rs会中途暂停读取,等缓存中的数据写入后继续读取。这是一个简单的流程注意write函数虽然返回false,告诉内存不能写了,但是如果你还是写如果输入了,内存还是会接收到数据,而不是丢掉。当然,这会导致内存占用过高。现在有一种更简单更常用的写法,就是pipeconstfs=require('fs')varfile='D:/Users/movie.mp4'varrs=fs.createReadStream(file)varws=fs.createWriteStream('D:/Users/movie-1.mp4',{highWaterMark:65536})rs.pipe(ws)//pipe以流水线形式编写,顺序运行。如果有的话,可以写成://rs.pipe(gzip).pipe(ws).pipe(conn)当然,pipe的内部实现要比上面的代码复杂很多,但是主要我们要做的就是我们刚才所做的。当各个链路的流量不同时,通过调整流量来控制内存占用。练习使用可读流文件读取特定路径r')//打开路径路径文件varfileStat=fs.statSync(path)varfileSize=fileStat.sizevarposition=0returnnewReadable({read(size){varbuf=Buffer.alloc(1024)//1024字节在buf上可用if(position>=fileSize){this.push(null)fs.close(fd,(err)=>{console.log(err)})}else{fs.read(fd,buf,0,1024,位置,(err,bytesRead)=>{//从fd文件的位置开始,读取1024个字节放在buf的第0个位置if(err){console.log(err)}if(bytesRead<1024){this.push(buf.slice(0,bytesRead))}else{this.push(buf)}})position+=1024}}})}在节点中运行代码$node>cfrs=require('./file-read-stream.js')>rs=cfrs('./http-server.js')>rs.on('data',d=>console.log(d.toString()))这样前面HTTP模块案例的代码section可以读现在,如果我们加上写函数,就可以实现复制文件的功能了。exports.createWriteStream=functioncreateWriteStream(path){varfd=fs.openSync(path,'a+')varposition=0returnnewWritable({write(chunk,encoding,done){fs.write(fd,chunk,0,chunk.length,position,()=>{done()})position+=chunk.length}})}$node>const{createReadStream,createWriteStream}=require('./file-read-stream.js')>createReadStream('./http-server.js').pipe(createWriteStream('./http-server222.js'))运行这段代码,文件夹中会出现一个http-serverhttp-server222的拷贝文件回到昨天的静态文件服务器,我们需要将readFile改为streamconsthttp=require('http')constfs=require('fs')constfsp=fs.承诺constpath=require('path')constmime=require('mime')constport=8090constbaseDir=__dirnameconstserver=http.createServer(async(req,res)=>{vartargetPath=decodeURIComponent(path.join(baseDir,req.url))console.log(req.method,req.url,baseDir,targetPath)try{varstat=awaitfsp.stat(targetPath)if(stat.isFile()){try{vartype=mime.getType(targetPath)if(type){//如果文件类型在mimeMap对象中,则使用对应的解码方法res.writeHead(200,{'Content-Type':`${type};charset=UTF-8`})}else{//如果不是,则解码为流res.writeHead(200,{'Content-Type':`application/octet-stream`})}fs.createReadStream(targetPath).pipe(res)}catch(e){res.writeHead(502)res.end('502内部服务器错误')}}elseif(stat.isDirectory()){varindexPath=path.join(targetPath,'index.html')try{awaitfsp.stat(indexPath)vartype=mime.getType(indexPath)if(type){res.writeHead(200,{'Content-Type':`${type};charset=UTF-8`})}else{res.writeHead(200,{'Content-Type':`application/octet-stream`})}fs.createReadStream(indexPath).pipe(res)}catch(e){if(!req.url.endsWith('/')){res.writeHead(301,{'Location':req.url+'/'})res.end()return}varentries=awaitfsp.readdir(targetPath,{withFileTypes:true})res.writeHead(200,{'Content-Type':'text/html;charset=UTF-8'})res.end(`${entries.map(entry=>{varslash=entry.isDirectory()?'/':''return`

<ahref='${entry.name}${slash}'>${entry.name}${slash}
`}).join('')}`)}}}catch(e){res.writeHead(404)res.end('404NotFound')}})server.listen(port,()=>{console.log(port)})