当前位置: 首页 > 后端技术 > Node.js

巧妙的复制一个stream

时间:2023-04-03 19:26:44 Node.js

场景在实际业务中,可能会重复消费一个可读stream,比如在pre-filter中解析requestbody,获取body进行相关权限和身份认证;认证通过后,框架或后置过滤设备再次解析请求体,传递给业务上下文。因此,需要重复消费同一个流也就不足为奇了。这类似于通过在js上下文中深度克隆对象来操作对象的副本,以防止源数据被污染。constKoa=require('koa');constapp=newKoa();letparse=function(ctx){returnnewPromise((res)=>{letchunks=[],len=0,body=null;ctx.req.on('data',(chunk)=>{chunks.push(chunk)len+=chunk.length});ctx.req.on('end',()=>{body=(Buffer.concat(chunks,len)).toString();res(body);});})}//认证app.use(async(ctx,next)=>{letbody=JSON.parse(decodeURIComponent(awaitparse(ctx)));if(body.name!='admin'){returnctx.body='permissiondenied!'}awaitnext();})//解析body传递给业务层app.use(async(ctx,next)=>{letbody=awaitparse(ctx);ctx.postBody=body;awaitnext();})app.use(asyncctx=>{ctx.body='HelloWorld\n';ctx.body+=`postbody:${ctx.postBody}`;});app.listen(3000);上面的代码片段无法正常运行,无法响应请求。这是因为在前置过滤器的鉴权逻辑中消费了请求体,在二级过滤器中无法再次消费请求体,所以请求会被阻塞。在实际业务中,认证逻辑往往与各个公司规范相关,是一个“第二方库”;而例子中的第二季filter通常是作为第三方库存在的,所以为了不影响第三方包消费requestsbody,必须存放在经过认证的第二方包ctx.req中,这个数据可读流仍然存在,这就涉及到本文的主旨。实现复制流并不像复制对象那样简单直接。流的使用是一次性的。一旦可读流被消费(写入Writeable对象),那么这个可读流是不可更新的,不能被重用。使用。然而,通过一些简单的技术,可以再次恢复可读流。但是,虽然恢复后的流的内容和之前的流一样,但不是同一个对象。因此,这两个对象的属性和原型是不同的,这种情况经常发生。会影响后续的使用,不过办法总是有的,见下文。实现一:可读流的“影子克隆技术”可读流的“影子克隆技术”与火影的类似,但仅限于克隆对象流的特性,即保证克隆流具有相同的数据。但是克隆出来的流不能有原对象的其他属性,但是我们可以通过原型链继承来实现属性和方法的继承。letReadable=require('stream').Readable;letfs=require('fs');letpath=require('path');classNewReadableextendsReadable{constructor(originReadable){super();this.originReadable=originReadable;这个。开始();}start(){this.originReadable.on('data',(chunck)=>{this.push(chunck);});this.originReadable.on('end',()=>{this.push(null);});this.originReadable.on('error',(e)=>{this.push(e);});}//作为Readable的实现类,必须实现_read函数,否则会抛出Error_read(){}}app.use(async(ctx,next)=>{letcloneReq=newNewReadable(ctx.req);letcloneReq2=newNewReadable(ctx.req);//此时ctx.req已经被消费(无内容),所有数据完全在克隆的两个流上//消费cloneReq获取鉴权数据letbody=JSON.parse(decodeURIComponent(awaitparse({req:cloneReq})));//重置cloneReq2克隆的原型链,继承ctx.req的原有属性cloneReq2.__proto__=ctx.req;//之后,再给ctx.req被复制并留给后续的过滤器消费ctx.req=cloneReq2;if(body.name!='admin'){returnctx.body='permissiondenied!'}awaitnext();})avatar的技术可以同时复制多个可读流。同时需要重新赋值原来的流,继承原来的属性,以免影响后续的重复消费。实现2:lazyimplementstream模块有一个特殊的类,即Transform。关于Transfrom的特点,我在深入节点的Transform一文中有详细介绍。它具有可读可写流的双重特性,因此使用Transfrom可以快速方便地实现克隆。首先,可读流通过管道函数指向两个Transform流(之所以有两个是因为一个流需要在pre-filter消费,第二个在后续filter消费)。让cloneReq=newTransform({highWaterMark:10*1024*1024,transform:(chunk,encode,next)=>{next(null,chunk);}});让cloneReq2=newTransform({highWaterMark:10*1024*1024,transform:(chunk,encode,next)=>{next(null,chunk);}});ctx.req.pipe(cloneReq)ctx.req.pipe(cloneReq2)在上面的代码中,它看起来像CTX。请求流被消耗(管道)两次。其实pipe函数可以看作是Readable和Writeable实现背压的“语法糖”实现。具体可以通过node中的Stream-Readable和Writeable来理解,所以得到的结果是“ctx.req被消费了一次,但是在cloneReq和cloneReq2这两个Transfrom对象的readbuffer中复制了数据,并且克隆已实施。”其实pipe对于Readable和Writeable都进行了限流,先为Readable的数据事件监听,执行Writeable的write函数。当Writeable的writebuffer大于一个临界值(highWaterMark)时,write函数返回false(这意味着Writeable赶不上Readable的速度,Writeable的writebuffer已满。),此时,pipe修改Readable模式,执行pause方法,进入paused模式,停止读取readbuffer。同时Writeable开始刷新writebuffer,刷新完成后异步触发drain事件。在事件处理函数中,将Readable设置为流动状态,继续执行流动函数不断刷新读缓冲区,从而完成管道限流。需要注意的是,Readable和Writeable各自维护一个buffer,在实现上有区别:Readable的buffer是一个数组,存储Buffer、String和Object类型;而Writeable是一个有向链表,里面存放的是需要写入的数据。最后,在复制数据的同时,将额外的属性复制到其中一个对象://重置cloneReq2原型链,继承ctx.req的原始属性cloneReq2.__proto__=ctx.req;//之后再次复制ctx.req,并将其留给后续的过滤器消耗ctx.req=cloneReq2;至此,通过Transform的克隆就完成了。完整代码如下(最前置过滤器)://authenticationapp.use(async(ctx,next)=>{//letcloneReq=newNewReadable(ctx.req);//letcloneReq2=newNewReadable(ctx.req);让cloneReq=newTransform({highWaterMark:10*1024*1024,transform:(chunk,encode,next)=>{next(null,chunk);}});让cloneReq2=newTransform({highWaterMark:10*1024*1024,transform:(chunk,encode,next)=>{next(null,chunk);}});ctx.req.pipe(cloneReq)ctx.req.pipe(cloneReq2)//此时ctx.req已经被消费完了(没有内容),所有数据完全在克隆的两个流上//消费cloneReq,获取鉴权数据letbody=JSON.parse(decodeURIComponent(awaitparse({req:cloneReq}))));//重置克隆的cloneReq2的原型链,继承ctx.req原有的属性cloneReq2.__proto__=ctx.req;//之后重新复制ctx.req,交给后续的filter消费ctx.req=cloneReq2;if(body.name!='admin'){returnctx.body='permissiondenied!'}awaitnext();})表示执行了两次ctx.req来管道对应的cloneReq和cloneReq2,然后立即消费cloneReq对象是否合理?如果源数据足够大,cloneReq在管道结束前被消费。会不会有什么问题?实际上,大部分的管道函数都是异步操作,即对源流和目的流的一些流控措施。目标流使用cloneReq对象。在对象的实例化过程中,transform函数通过调用next函数直接传递接收到的数据。进入Transform对象的可读流缓冲区,同时触发“可读和数据事件”。这样我们下面对cloneReq对象的消费也是通过“监听数据事件”来实现的,所以即使ctx.req的数据还没有被消费,下面的cloneReq对象还是可以正常消费的。从ctx.req-->cloneReq-->仍然可以看出数据流是消费。使用Transform流实现clone可读流的缺点:在上面的例子中,Transfrom流的实例化传入了一个参数highWaterMark。当readbuffersizehighWater,transfrom方法不会执行,后续不会消费Transfrom流(当Transfrom流被消耗后,Transfrom流的读缓冲区会变小,当其大小