可写流(WritableStream)可写流是对向‘目的地’写入数据的一种抽象。可写流的原理其实和可读流类似。当数据到来时,它会被写入缓冲池。当写入速度很慢或写入暂停时,数据流会进入队列池进行缓存。当然,即使缓存池满了,剩余的数据也存在于内存中。可写流的简单用法如下:letfs=require('fs');letpath=require('path');letws=fs.createWriteStream(path.join(__dirname,'1.txt'),{highWaterMark:3,autoClose:true,flags:'w',encoding:'utf8',mode:0o666,start:0,});leti=9;functionwrite(){letflag=true;while(i>0&&flag){flag=ws.write(--i+'','utf8',()=>{console.log('ok')});console.log(flag)}}write();//drain仅在缓冲区已满并被消耗时触发ws.on('drain',function(){console.log('drain')write();});实现原理现在让我们实现一个简单的可写流来研究一下可写流的内部原理。可写流有许多类似于可读流的方法。我们不会在这里重复它们。首先,我们必须有一个构造函数来定义一些基本的选项属性,然后调用一个open方法打开文件,还有一个destroy方法处理关闭逻辑letEventEmitter=require('events');letfs=require('fs');classWriteStreamextendsEventEmitter{路径,选项){超级();this.path=路径;this.highWaterMark=options.highWaterMark||16*1024;this.autoClose=op选项.autoClose||真的;this.mode=options.mode;this.start=options.start||0;this.flags=options.flags||'w';this.encoding=options.encoding||'utf8';//可写流必须有一个缓冲区,在写入文件时,必须将内容写入缓冲区//在源代码中,它是一个链表=>[]this.buffers=[];//识别是否在写this.writing=false;//是否满足触发drain事件this.needDrain=false;//记录写入位置this.pos=0;//记录缓冲区的大小this.length=0;这个.open();}destroy(){if(typeofthis.fd!=='number'){returnthis.emit('close');}fs.close(this.fd,()=>{this.emit('close')});}open(){fs.open(this.path,this.flags,this.mode,(err,fd)=>{if(err){this.emit('error',err);if(this.autoClose){this.destroy();}返回;}这个.fd=fd;this.emit('打开');})}}module.exports=WriteStream;然后我们实现write方法来调用可写流对象。在write方法中,我们首先将数据转换成buffer,然后实现一些事件触发条件的逻辑。如果现在不在写,我们就实际执行写操作。这里我们实现了一个_write方法来实现写操作。否则,表示正在写入文件。那么我们会将流式数据先放到缓冲区中,保证写入的数据不会被同时处理write(chunk,encoding=this.encoding,callback=()=>{}){chunk=Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);//write返回一个boolean类型this.length+=chunk.length;让ret=this.length{callback();this.clearBuffer();});//8}returnret;}_write(chunk,encoding,callback){if(typeofthis.fd!=='number'){returnthis.once('open',()=>this._write(chunk,编码,回调));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length-=byteWritten;this.pos+=byteWritten;回调();//清除缓冲区的内容});}在_write写入后的回调中,我们会调用传入的回调函数clearBuffer,该方法会继续递归地从buffer中取出数据,然后继续调用_write方法Write直到缓冲区中的所有数据都被取出,从而清除缓冲区clearBuffer(){letbuffer=this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing=false;if(this.needDrain){//是否需要触发drain?This.needDrain=false;this.emit('drain');}}}最后附上完整的代码letEventEmitter=require('events');letfs=require('fs');classWriteStreamextendsEventEmitter{constructor(path,options){super();this.path=路径;this.highWaterMark=选项。高水位||16*1024;this.autoClose=options.autoClose||true;this.mode=options.mode;this.start=options.start||0;this.flags=options.flags||'w';this.encoding=options.encoding||'utf8';//可写流必须有一个缓存区,在写入文件时,内容必须写入缓存区//在源代码中,它是一个链表=>[]this.buffers=[];//判断是否this.writing=错误的;//是否满足触发drain事件this.needDrain=false;//写入记录的位置this.pos=0;//记录缓冲区的大小this.length=0;这个.open();}destroy(){if(typeofthis.fd!=='number'){returnthis.emit('close');}fs.close(this.fd,()=>{this.emit('close')})}open(){fs.open(this.path,this.flags,this.mode,(err,fd)=>{if(err){this.emit('error',err);if(this.autoClose){this.destroy();}return}this.fd=fd;this.emit('open');})}write(chunk,encoding=this.encoding,callback=()=>{}){chunk=Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);//write返回一个boolean类型this.length+=chunk.length;让ret=this.length{callback();this.clearBuffer();});//8}返回ret;}clearBuffer(){letbuffer=this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing=false;if(this.needDrain){//是否需要触发drain?This.needDrain=false;this.emit('drain');}}}_write(chunk,encoding,callback){if(typeofthis.fd!=='number'){returnthis.once('open',()=>this._write(chunk,encoding,callback));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length-=byteWritten;this.pos+=byteWritten;callback();//清除缓冲区的内容});}}module.exports=WriteStream;和可写流,那么如何把两者结合起来呢,node给我们提供了一个很好的方法--Pipe管道,顾名思义就是在可读流和可写流之间加一条管道,实现边读边读写一点,读一点,写一点管道使用如下('./WriteStream');letrs=newReadStream(path.join(__dirname,'./1.txt'),{highWaterMark:4});letws=newWriteStream(path.join(__dirname,'./2.txt'),{highWaterMark:1});//41rs.pipe(ws);实现原理Pipe的原理比较简单,就是监听可读流的数据事件不断获取文件中的数据,然后我们会调用写流的write方法。如果可写流缓冲区满了,那么当我们调用可读流的pause方法暂停读取,然后等到写流的缓冲区写满触发drain事件时,再调用resume重新启动读取获取过程。上面的代码pipe(ws){this.on('data',(chunk)=>{letflag=ws.write(chunk);if(!flag){this.pause();}});ws.on('drain',()=>{this.resume();})}自定义流Node允许我们自定义流,读流继承自Readable接口,写流继承自Writable接口,所以我们其实可以自定义定义一个stream模块,只需要继承stream模块对应的接口即可。自定义可读流如果我们要自定义读取流,那么就需要继承Readable。Readable中有一个read()方法,默认调用_read(),所以我们只需要重写_read()方法即可。读书的逻辑。同时Readable还提供了push方法。调用push方法会触发数据事件。推送中的参数为数据事件回调函数的参数。当push传入的参数为null时,表示读取流停止,添加代码let{Readable}=require('stream');//实现任何流都继承这个流//有一个read()方法,默认有_read()//Readable提供了一个push方法当调用push方法时会触发data事件letindex=9;classMyReadextendsReadable{_read(){//什么时候调用push方法可读流停止?当push为null时停止if(index-->0)returnthis.push('123');这个。推(空);}}letmr=newMyRead();mr.on('data',function(data){console.log(data);});自定义可写流类似于自定义读取流。自定义写流需要继承Writable接口,实现一个_write()方法。注意这里是_write可以传入3个参数,chunk,encoding,callback,chunk是写入的数据,一般是buffer,encoding是编码类型,一般不用,最后一个callback要注意不是我们用的自定义写流调用write时的回调,而是我们上面提到的写流实现时的clearBuffer函数。let{Writable}=require('stream');//可写流实现了_write方法//源码中默认调用的是Writable类中的write方法MyWriteextendsWritable{_write(chunk,encoding,callback){console.日志(块。toString());打回来();//clearBuffer}}letmw=newMyWrite();mw.write('111','utf8',()=>{console.log(1);})mw.write('222','utf8',()=>{console.log(1);});DuplexduplexstreamDuplexstream其实就是结合了上面说的自定义读流和自定义写流,既可以读也可以写,同时可以实现读写相互干扰let{Duplex}=require('stream');//duplexstream可以读写,和read没关系(互不干扰)letd=Duplex({read(){this.push('hello');this.push(null);},write(chunk,encoding,callback){console.log(chunk);callback();}});d.on('data',function(data){console.log(data);});d.write('hello');Transform转换流的本质是double,唯一不同的是不需要像上面说的双工流那样实现读写,只需要实现转换的transform方法let{Transform}=require('stream');//参数同可写流lettransform1=Transform({transform(chunk,encoding,callback){this.push(块.toString().toUpperCase());//将输入内容放入一个可读流中callback();}});lettransform2=Transform({transform(chunk,encoding,callback){console.log(chunk.toString());callback();}});//等待你的输入//rs.pipe(ws);//希望将输入的内容转成大写输出process.stdin.pipe(tranform1).pipe(tranform2);//对象流只能把buffer或string对象流放在可读流中。默认情况下,流处理的数据是Buffer/String类型的值对象流??。特点是它有一个objectMode标志,我们可以设置它来允许流接受任何JavaScript对象。上面的代码const{Transform}=require('stream');letfs=require('fs');letrs=fs.createReadStream('./users.json');rs.setEncoding('utf8');让toJson=Transform({readableObjectMode:true,transform(chunk,encoding,callback){this.push(JSON.parse(chunk));callback();}});letjsonOut=Transform({writableObjectMode:true,transform(块,编码,回调){console.log(chunk);callback();}});rs.pipe(toJson).pipe(jsonOut);