当前位置: 首页 > 科技观察

Node.jsStreamBackpressure——如果消费者端的数据积压来不及处理会怎样?

时间:2023-03-14 19:55:40 科技观察

Stream是Node.js中广泛使用的模块。流两端的可读流和可写流通过管道连接。通常,写入磁盘的速度低于从磁盘读取的速度。管道两端会存在压差,需要平衡机构才能使其从一端顺畅流向另一端。背压是指将数据写入流的速度超过其处理能力的术语。比如基于Stream写文件的时候,当写端无法处理时,会通知读端,你可以先等一下,我这边太忙了。。。等过一定时间再读写。问题的根源是“数据以流的形式从可读流流向可写流,不会全部读入内存。我想说的是上游流速太快,下游来不及消费,导致数据积压,即“背压”,会出现什么问题?这个问题来自“Nodejs技术栈-交流群”的朋友。当时没有给出答案,也没有做过类似的实际数据测试。这种情况通常会导致数据流向两端不平衡,另一端的数据不断积压,不断消耗系统内存,其他业务也难免受到影响。本文通过修改编译Node.js源码,在关闭“背压”后,进行了一些测试,可以清楚的看到两者的效果对比。流式数据读取->写入示例先构造一个大文件,我在本地创建了一个2.2GB的文件,通过大文件可以清楚的看出处理backlog和不处理backlog的区别。下面的例子实现了读取一个文件,gzip压缩后写入一个新的目标文件的功能。它也可以写成readable.pipe(gzip).pipe(writable)但这没有任何错误处理机制。您可以使用一些工具https://github.com/mafintosh/pump处理。对于处理此类任务,Stream模块还提供了一个实用的方法管道,可以在管道中处理不同的数据流。当其中一个数据流发生错误时,它会自动处理并释放相应的资源。//stream-back-pressure-test.jsconstgzip=require('zlib').createGzip();constfs=require('fs');const{pipeline}=require('stream/promises');constreadable=fs.createReadStream('2.2GB-file.zip');constwritable=fs.createWriteStream('2.2GB-file.zip.gz');(async()=>{try{awaitpipeline(readable,gzip,writable);控制台。log('Pipelinesucceeded.');}catch(err){console.error('Pipelinefailed.',err);}})();write()源码修改编译write(chunk)方法引入可写流??对象write(chunk)方法接收一些要写入流中的数据。当内部缓冲区小于创建可写流对象时配置的highWaterMark时,返回true,否则返回false,表示内部缓冲区已满或溢出,这是一种背压性能。向流写入数据的速度已经超过了它的处理能力。如果此时继续调用write()方法,可以想象内部缓冲区会不断增加,当前进程占用的系统内存也会不断增加。在使用pipe()或者pipeline进行内部处理时,也会调用stream.write(chunk)方法。stream.write(chunk)如果我们想测试一些数据积压导致的消费问题,我们需要修改Node.js源码,将stream.write(chunk)方法的返回值改为true来关闭积压处理。源码修改我直接拉了Master代码。刚开始忘记切换Node.js版本了。。。版本都差不多,大致相同。最主要的是找到Writable.prototype.write()方法。该方法最终的返回值是一个布尔值,找到returnret&&!state.errored&&!state.destroyed直接改成returntrue;禁用背压处理。//https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L334Writable.prototype.write=function(chunk,encoding,cb){return_write(this,chunk,encoding,cb)===true;};//https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L396//如果我们已经在写东西,那么只需将此//放入队列中,然后等待轮到。否则,call_write//如果返回false,则需要draineevent,sosetthatflag.functionwriteOrBuffer(stream,state,chunk,encoding,callback){...//stream._writeresetsstate.lengthconstret=state.length