当前位置: 首页 > 后端技术 > Node.js

深入了解节点的Transform

时间:2023-04-03 16:31:34 Node.js

Transform流特征。开发中直接接触Transform流程的案例并不多。往往使用相对成熟的模块或封装的API来完成流程处理。最特别的是through2模块和gulp流操作。那么,Transform流程有哪些特点呢?从名字上看,Transform是加工的意思,类似于生产线上的每道工序,每道工序对进货的产品进行相应的加工;从结构上看,Transform是一个双工流,用通俗的说法可以解释为可读流,也可以是可写流。但是node根据自身的特点对Transform流做了更多的特殊定制,使得Transform不是一个纯粹的Duplex流。由于Transform流包含Readable和Writeable特性,因此Transform在实际使用中有多种方式:既可以只作为消费者消费数据,也可以同时作为生产者和消费者完成数据的中间处理。下面将逐步深入讲解Transform的运行机制和使用技巧。上图展示了一个Transform实例的组成部分:Readablepartialbuffer(数组)、内部_read函数、Writeablepartialbuffer(链表)、内部_write函数、Transform实例必须实现的内部_transform函数、系统提供的Callback函数afterTransform。由于Transform实例同时有两个buffer,所以也需要了解这两个buffer的存储和消费顺序,这对后面使用原生Transform写代码有很大的指导意义。传统意义上的流(即Readable和Writeable)的实现者需要实现相应的内部函数_read()和_write()。对于Readable实例,_read函数用于准备从源文件中获取数据并添加到readbuffer中;对于Writeable实例,_write函数一次性从writebuffer链表刷新到磁盘。它们分别对应读写过程的第一步和最后一步。详情可以关注文章《node.js中的Stream》。然而,Transform中_read和_write函数的实现却大相径庭。由于需要考虑流的处理,所以重点分析Transform的内部函数执行过程。示例演示:readable.pipe(transform);以上面示例代码为例,transform作为消费者消费readable。Transform的实例有transformState和readableState属性,保存了相关的属性,比如transform状态信息,回调函数存储和编码等。Transform作为消费者,会在write函数中消费数据。write函数的实现细节在node中的Stream一文中有介绍,数据的写入是通过内部调用_write函数实现的。在Transform中,重写了_write函数:保存transform接收到的chunk数据、代码和函数(执行刷新writebuffer),并在一定条件下执行_read函数(状态为非transformed时,只要readbuffersize不超过设置size,然后执行_read)如果一切顺利,可读数据会顺利执行transform的write->_write->_read,那么Transform在_read中发生了什么变化,本来负责填充的读取缓冲区?Transform.prototype._read=function(n){varts=this._transformState;if(ts.writecunk!==null&&ts.writecb&&!ts.transforming){ts.transforming=true;this._transform(ts.writechunk,ts.writeencoding,ts.afterTransform);}else{//标记我们需要一个转换,以便传入的任何数据//都将得到处理,现在我们已经请求了。ts.needTransform=true;}};可以看出_read的实现很简单,根据条件选择执行_transform函数。需要注意的是_read的参数n没有用到,因为是否向读缓冲区插入数据是由开发者在_transform中决定的。_transform函数相信大家都不陌生。Node规定Transform实例必须提供_transform函数,这个函数在_read中调用。_transform有三个参数,第一个是要处理的chunk数据,第二个是编码,第三个是回调函数。前两个参数很容易理解。我们可以在_transform中任意处理数据,最后调用回调函数完成处理。那么,这个回调函数到底是什么?就是Transform架构图中的afterTransform函数。它有几个功能:清除各种状态信息,比如transformState对象的一些属性,为下一步处理数据使用可选的保存处理结果到读缓冲区刷新写缓冲区,执行下一阶段的数据流处理。可以看出,执行完afterTransform函数后基本就宣告了第一阶段transform的结束。为什么是第一阶段?因为transform已经完成了consumer(即Writeable)的角色,如果用户传入数据到_transform中的writebuffer,那么transform同时也是producer,为后续consumer消费数据提供数据,这涉及到Transform的使用。Transform的生产和消费实例conststream=require('stream')varc=0;constreadable=stream.Readable({highWaterMark:2,read:function(){vardata=c<26?String.fromCharCode(c+++97):null;console.log('push',data);this.push(数据);}})consttransform=stream.Transform({highWaterMark:2,transform:function(buf,enc,next){console.log('transform',buf.toString());next(null,buf);}})readable.pipe(转换);示例代码简单,创建一个可读流,向消费者提供小写的a-z字母;创建一个transformationstream,在_transform函数中,不处理数据,只输出dot,数据通过回调函数传递给readbuffer。我们的目的是通过transform输出26个小写字母,但是目前程序执行的结果并不理想:执行结果:pushapushbtransformapushctransformbpushdpushepushftranform只处理字母b,readable只提供a-f数据然后就停了突然,为什么?这一切都归结为变换对象。仔细阅读以上内容,我们知道所有的Transform实例都同时拥有两个buffer。写缓冲区用于接收生产者的数据进行转换操作,读缓冲区用于为消费者缓存数据。目前的实现中,transform._transform函数输出要处理的数据,执行next(null,buf);同时。这个函数上面已经分析过了,就是afterTransform函数,第一个参数是Error实例,第二个是读缓冲区中存放的数据。本例中,执行完_transform后,会将处理后的数据存储到readbuffer中,readbuffer中的数据会被后续的consumer消费。但是transform后面没有consumer,所以transform处理完字母b存入readbuffer后,readbuffer已经满了(设置highWaterMark为2,即读写buffer的最大值为2个字节)。当字母c和d也执行到transform._write时,由于不满足执行transform._read的条件,无法执行transform._transform函数,无法执行afterTransform函数,导致数据刷新失败在写缓冲区中,导致在写缓冲区中存储字母c和d。因为transform的writebuffer已满(transform.write()返回false),所以只将字母e和f存放在readable的readbuffer中,等待消费。这创建了一个无限循环,readable和transform的所有缓冲区都已满,流停止。这个问题的解决方案很简单,有两种不同的解决方案:保持transform的readbuffer为空和增加consumer对transform的readbuffer的消耗。实际上,transform的readbuffer是真的被消耗了。第一种方案:保证transform的读缓冲区为空:consttransform=stream.Transform({highWaterMark:2,transform:function(buf,enc,next){console.log('transform',buf.toString())next(null,null)}})只需要将null传递给next函数,这样transform消费完数据后,就会宣告数据处理结束,读取缓冲区永远为空。方案二:添加消费者:consttransform=stream.Transform({highWaterMark:2,transform:function(buf,enc,next){console.log('transform',buf.toString())next(null,buf)}})readable.pipe(transform).pipe(process.stdout);transform的实现保持不变,但增加了消费者process.stdout。这样也保证了transform的readbuffer处于可添加状态,也让afterTransform函数有机会刷新writebuffer,开始新的数据处理过程。through2的实现through2的亮点是转换流程。使用through2的API可以很方便的创建一个Transform实例来完成对数据流的处理。functionthrough2(construct){returnfunction(options,transform,flush){if(typeofoptions=='function'){flush=transformtransform=optionsoptions={}}if(typeoftransform!='function')变换=noopif(typeofflush!='function')flush=nullreturnconstruct(options,transform,flush)}}module.exports=through2(function(options,transform,flush){vart2=newDestroyableTransform(options)t2._transform=transformif(flush)t2._flush=flushreturnt2})可以看出,through2模块只封装了Transform的构造函数,封装了更易用的objectMode模式。之所以推荐使用through2创建Transform对象,不仅是因为它提供了方便的API,也是为了兼容性。Transform对象是Stream2.0的一个特性,在早期版本的node中没有实现,通过through2创建的Transform实例在早期版本的node下仍然可以正常使用,因为through2没有引用stream模块默认由节点提供。相反,使用社区中更流行的“readable-stream”模块。小结本文旨在深入探讨through2中使用的Transform流,同时作为上一篇文章中流在节点中的回顾和应用。通过文末的简单示例,我们可以了解Transform在开发过程中可能出现的问题,学会随意切换Transform的生产者和消费者的身份,从而更好地指导实际开发。