当前位置: 首页 > 后端技术 > Node.js

Node.js中使用pipe处理数组的实现

时间:2023-04-03 20:11:56 Node.js

TLDR;这篇文章的风格是向吉姆先生致敬;敬礼,敬礼,明白,不是抄袭,程序员怎么能叫抄袭呢。当然,我也在学习和销售Node.js的流。如有使用不当,请指出。原文链接欢迎star。写这篇文章的初衷是多年前看SICP的时候。我在第2章介绍数据抽象的构造时,提到过Lisp用一种类似于“信号流”的方法来处理序列。于是很自然地想到了Node.js中的pipe方法,所以一直想用pipe方法试试。正如吉姆先生的文章所述,我也患上了懒癌。从年底拖到今年年初,然后看到了Jim先生年初的博客。深受启发,终于下定决心开始码字了。。。。。然后,嗯,一直拖到昨天。下定决心写的主要原因是昨天的部门年会!反正年会和我这种死肥宅关系不大。趁着大家高兴的时候构思了代码实现,回家就加了代码,用了一晚上。Jim先生在他的文章中也说过,JS的那些数组操作(map/reduce/filter)每次调用都会进行一次完整的遍历。想象一下,如果有一个数组,首个数为1,长度为1亿加1,需要将所有数组乘以3,然后排除奇数。如果使用(map/filter)方式,只要还需要循环1.5亿次;如果有另一种方法只循环1亿次,是否会节省大量内存资源和循环消耗的时间?话不多说,我们直接上代码。pipe在编写代码时,我们应该有一些方法来连接程序,就像连接水管一样——当我们需要获取一些数据时,我们可以“拧紧”其他部分来达到目的。这也是IO应该做的。——道格·麦克罗伊。1964年10月11日有关节点上的流,请参阅本文。以下是代码部分。学习pipe的时候,一晚上就草草的写出了全部代码。我又懒又癌,不想再重构了。请仔细阅读,请勿喷码。entryconststream=require('stream')constlast=Symbol()//在selfArray中接收一个真正的数组//返回一个可读流//如果做的再细一点,可以做成可读可写流,这样就可以通过控制stream的大小来控制内存的大小,以免上亿条数据直接爆内存//但是后面reduce的处理比较麻烦functionselfArray(a){constrs=newstream.Readable({objectMode:true})a.forEach((v,index)=>{rs.push(v)})rs.push(last)rs.push(null)返回rs上面的selfArray在stream的末尾Push一个Symbol对象来标记整个stream的输入结束,留待reduce后面使用。Map/Filter/Reduce的实现函数forEach(callback){constws=newstream.Writable({objectMode:true})letindex=0ws._write=function(chunk,enc,next){if(chunk!==last){callback(chunk,index++)next()}}returnws}functionfilter(callback){consttrans=newstream.Transform({readableObjectMode:true,writableObjectMode:true})让索引=0trans._transform=function(chunk,enc,next){if(chunk===last){next(null,last)}else{letcondition=callback(chunk,index++)if(condition){this.push(chunk)}next()}}returntrans}functionmap(callback){consttrans=newstream.Transform({readableObjectMode:true,writableObjectMode:true})让索引=0trans._transform=function(chunk,enc,next){if(chunk===last){next(null,last)}else{next(null,callback(chunk,index++))}}returntrans}function红色uce(callback,initial){consttrans=newstream.Transform({readableObjectMode:true,writableObjectMode:true})letindex=0,current=initial,prev=initialtrans._transform=function(chunk,enc,next){if(chunk===last){if(index>1){prev=callback(prev,current,index-1)}this.push(prev)this.push(last)returnnext(null,last)}if(initial===void0&&index===0){prev=chunk}if(index>0){prev=callback(prev,current,index-1)}current=chunkindex++next()}returntrans}上面的代码在reduce的实现上有点麻烦。Reduce在没有初值,原数组为空的情况下,有多种处理情况。看了MDN的解释,自己实现了selfArray([9,2,6,3,5,6,7,1,4,4]).pipe(map(v=>v*3)).pipe(filter(v=>v%2)).pipe(reduce((p,c)=>p+c,0)).pipe(forEach(v=>{console.log('最终结果管道计算是:',v)}))我故意把各种括号都去掉了。嗯,它看起来很完美,让我们测试一下selfArray([9,2,6,3,5,6,7,1,4,4]).pipe(map(v=>{console.log('map:',v)返回v*3})).pipe(filter(v=>{console.log('filter:',v)returnv%2})).pipe(reduce((p,c)=>{console.log('reduce:',p,c)returnp+c},0)).pipe(forEach(v=>{console.log('pipe计算的最终结果为:',v)}))添加日志后可以看到结算结果为:map:9filter:27map:2filter:6map:6filter:18map:3filter:9reduce:027map:5filter:15reduce:279map:6filter:18map:7filter:21reduce:3615map:1filter:3reduce:5121map:4filter:12map:4filter:12reduce:723pipe最终计算结果为:75从上面的日志我们可以看到第一个数字9先执行map,然后3之后就直接执行了进入过滤器。这时候第二个数字2也被map处理了,然后又被filter处理了。但是由于3之后的偶数不会被reduce接收到,所以reduce会一直等到第二个奇数,也就是3进入之后。会一直处理……嗯,直到最终计算结果为75,被forEach消费掉。总结虽然我没有像Jim老师那样进行性能测试,但是我猜pipe方法在量比较少的情况下肯定比normal方法要弱。pipe的好处是在数据量比较大的时候可以使用比较小的内存。尽快处理数组中前面的数据。