节点stream很难理解和使用,但是“stream”是一个很重要的概念,会越来越普遍(fetch的返回值是stream),所以我们有必要认真研究stream.幸运的是,在nodestream之后,我们推出了一个更好用、更容易理解的webstreamsAPI。我们结合了WebStreamsEverywhere(和FetchforNode.js)的文章,2016-webstreams,ReadableStream和WritableStream学习它。节点流和网络流可以相互转换:.fromWeb()将网络流转换为节点流;.toWeb()将节点流转换为网络流。密集流(stream)是什么?stream是一个抽象的API。我们可以用promise做个类比。如果说promise是一个异步标准API,那么stream则希望成为I/O的标准API。什么是I/O?就是输入输出,也就是信息的读写,比如看视频、加载图片、浏览网页、编解码等,都属于I/O场景,所以不一定需要很大算作I/O的数据量。例如,读取磁盘文件算作I/O,读取“helloworld”字符串也算作I/O。Stream是I/O的当前标准抽象。为了更好的理解stream的API设计,让大家更深入的理解它,我们先来思考一个标准的I/OAPI应该如何设计?I/O场景应该如何抽象API?read()、write()是我们最先想到的API,如果继续添加,还有open()、close()等。这些API确实可以称为I/O场景标准API,而且足够简单。但是这些API都有一个缺点,就是缺乏对大数据量下读写的优化考虑。什么是大数据量的读写?比如读取一个几GB的视频文件,或者在2G的慢速网络环境下访问一个网页,在这些情况下,如果我们只有读写API,那么一个读取命令可能需要2个小时才能返回,而一条写命令需要3小时的执行时间,同时,对于用户来说,无论是看视频还是浏览网页,这么长的白屏时间都是无法接受的。但是为什么我们在看视频和浏览网页时不等那么久呢?因为在查看网页的时候,不需要等到所有资源都加载完毕之后再进行浏览和交互。许多资源在第一个屏幕呈现后异步加载。对于视频尤其如此。我们不会在加载30GB的电影后开始播放,但您可以先下载300kb的影片后开始播放。无论是视频还是网页,为了快速响应内容,在运行过程中不断加载资源。如果我们设计一个支持这种模式的API,那么无论资源大小,都可以覆盖。自然比read和wrte的设计更合理。.这种不断加载资源的行为就是流(stream)。什么是streamstream,可以认为是描述资源不断流动的状态。我们需要把I/O场景看成是一个连续的场景,就像把水从一条河引到另一条河一样。打个比方,当我们发送http请求,浏览网页,看视频的时候,我们可以把它看作是一个南水北调的过程,不断的把水从A河引到B河。当发送一个http请求,A河为后端服务器,B河为客户端;浏览网页时,A河是别人的网站,B河是你的手机;看视频的时候,A河是网络上的视频资源(当然也可以是本地的),B河是你的视频播放器。所以stream是一个连续的过程,可能有多个节点。不仅网络请求是流,资源加载到本地硬盘,读入内存,视频解码也是流。因此,在南水北调节点过程中,中途水库较多。把所有这些东西放在一起,你最终得到了网络流API。流分为三种,即:可写流、可读流、变换流,它们之间的关系如下:readablestreams代表一条河流,它是数据的来源。因为是数据源,所以只能读不能写。可写流代表B河,它是数据的目的地。因为需要不断地存水,所以只能写不能读。Transformstreams是中间转换数据的节点。例如,A河和B河之间有一座大坝。这个大坝可以通过蓄水来控制运水的速度,也可以安装一个过滤器来净化水源,所以一端是可写的流进入A河的水,另一端是提供可读的流给B河读书。乍一看,这是一个复杂的概念,但映射到河道排水系统上,就很自然了。Stream的设计非常贴近生活的概念。要理解流,需要思考以下三个问题:可读流从何而来?您想使用转换流进行中间件处理吗?消费可写流的逻辑是什么?我再解释一下为什么stream比read()和write()多了三个考虑因素:既然stream把I/O抽象成了stream的概念,也就是它具有持久化,那么读取的资源一定是一个可读的stream,所以我们需要构造一个可读的流(以后可能会有越来越多的函数返回流,也就是在流环境下工作,所以我们不需要考虑如何构造流)。读取流是一个连续的过程,所以不像调用函数读取一次那么简单,所以可写流也有一定的API语法。正是因为对资源的抽象,不管是读取还是消费,都包裹了一层streamAPI,普通的read函数读取的资源都是自己,所以没有这种额外的思考负担。好在webstreamsAPI设计相对简单易用,作为一个标准规范,掌握它更有必要。下面单独说明:可读流不能写入读流,所以只能在初始化时设置值:constreadableStream=newReadableStream({start(controller){controller.enqueue('h')controller.enqueue('e')controller.enqueue('l')controller.enqueue('l')controller.enqueue('o')controller.close()}})controller.enqueue()可以填任意值,相当于将值添加到队列中。controller.close()关闭后不能继续入队,这里的关闭时机会在writablestreamsresponse的close回调中。以上只是一个模拟的例子。在实际场景中,读取流往往是一些调用函数返回的对象。最常见的是fetch函数:asyncfunctionfetchStream(){constresponse=awaitfetch('https://example.com')conststream=response.body;}可以看出返回的response.bodyfetch函数是一个可读流。我们可以通过以下方式直接使用读取流:readableStream.getReader().read().then({value,done}=>{})或readableStream.pipeThrough(transformStream)到转换流或readableStream。pipeTo(writableStream)到可写流。不管是手动mock还是函数return,我们都可以猜到读取流一开始不一定是全数据的。比如response.body可能因为读取的比较早,所以需要等待,就像连接的水管的水流很慢一样。而源池中的水,也大同小异。我们也可以手动模拟慢读:constreadableStream=newReadableStream({start(controller){controller.enqueue('h')controller.enqueue('e')setTimeout(()=>{controller.enqueue('l')controller.enqueue('l')controller.enqueue('o')controller.close()},1000)}})在上面的例子中,如果我们从一开始就连接写流,我们必须等待1s获取完整的'hello'数据,但是如果1s后连接到writestream,瞬间就可以读取到完整的'hello'。另外,写流的处理速度也可能很慢。如果writestream处理每个word的时间是1s,那么writestream无论何时执行都比readstream慢。因此,可以认识到流的设计是为了让整个数据处理过程尽可能的高效。不管读流数据准备多晚,开始连接写流有多晚,写流处理多慢,整个链路尽可能的最高效:如果readableStream就绪晚了,我们可以稍后连接,以便readableStream准备好开始快速消费。如果writableStream处理慢,只是此时消耗慢,连接的“水管”readableStream可能已经准备好了。这时候换成高消耗的writableStream可以提高整体效率。可写流是不可读的,可以这样创建:constwritableStream=newWritableStream({write(chunk){returnnewPromise(resolve=>{//消费到哪里,可以进行插入dom控制台等操作。log(chunk)resolve()});},close(){//写流的时候controller.close(),这里调用这个},})写流不关心读流是什么,所以只关心数据写进去就可以了,并实现write回调write。写回调需要返回一个Promise,所以如果我们以较慢的速度消费块,写流的执行速度就会变慢。我们可以这样理解,A河引水到B河,都是灌进去的,但是B河的河道很窄,不能承受这么大的水流,所以受限于B河的河道宽度河流,整体水流速度还是比较缓慢的(当然这里不可能发生洪水)。那么writableStream是如何触发写入的呢?可以直接通过write()函数写:writableStream.getWriter().write('h')也可以通过pipeTo()直接连接readableStream,就像手动滴水一样,只不过现在是直接连接一个水pipe,这样我们只处理写入就可以输入:readableStream.pipeTo(writableStream)当然pipeTo的效果也可以通过最原始的API来组装。为了更深入的理解,我们用原来的方法模拟一个pipeTo:value})=>{if(done){return}writer.ready().then(()=>writer.write(value))tryRead()})}tryRead()transformstreams里面的transformstream是写流+读流。创建转换流的方法如下:constdecoder=newTextDecoder()constdecodeStream=newTransformStream({transform(chunk,controller){controller.enqueue(decoder.decode(chunk,{stream:true}))}})chunk是writableStream得到的packet,controller.enqueue是readableStream的enqueue方法,所以它的底层实现其实是两个stream的叠加,在API上简化为transform,读取的数据可以边写边写转换为读取流以供后续使用写入流。当然,还有很多原生的转换流可用,比如TextDecoderStream:consttextDecoderStream=TextDecoderStream()可读到可写流下面是一个完整的例子,包括编码和转码://创建一个可读流constreadableStream=newReadableStream({start(控制器){consttextEncoder=newTextEncoder()constchunks=textEncoder.encode('hello',{stream:true})chunks.forEach(chunk=>controller.enqueue(chunk))controller.close()}})//创建写入流constwritableStream=newWritableStream({write(chunk){consttextDecoder=newTextDecoder()returnnewPromise(resolve=>{constbuffer=newArrayBuffer(2);constview=newUint16Array(buffer);view[0]=chunk;constdecoded=textDecoder.decode(view,{stream:true});console.log('decoded',decoded)setTimeout(()=>{resolve()},1000)});},close(){console.log('writablestreamclose')},})readableStream.pipeTo(writableStream)首先,readableStream使用TextEncoder以极快的速度转换hello的五个字母加入队列,执行controller.close(),即瞬间初始化readableStream,以后不可修改,只能读取。在writableStream的write方法中,我们使用TextDecoder解码chunk,一次解码一个字母,打印到控制台,1s后解析,所以writestream会每隔1s打印一个字母:h#1slatere#1slaterl#1slaterl#1slaterowritablestreamclose这个例子的转码解码处理不够优雅,我们不需要在流函数中写转码解码,而是在转换流中,例如:readableStream.pipeThrough(newTextEncoderStream()).pipeThrough(customStream).pipeThrough(newTextDecoderStream()).pipeTo(writableStream)这样,readableStream和writableStream都不需要进行编解码处理,只是在中间将流转换成Uint8Array,便于其他转换流处理。最后,将转换流解码并转换成文本后,将pipeTo写入到流中,这样写入流入得到的就是文本。但情况并非总是如此。比如我们要传输一个视频流,readableStream的原始值可能已经是Uint8Array了,那么是否连接转换后的流就看情况了。总结Streams是用于I/O抽象的标准处理API。它支持连续小块数据处理的特性不是偶然的,而是I/O场景抽象后的必然。我们用水流的例子来比较溪流的概念。当发生I/O时,源流转换有一个固定的xM/s的速度,而目标客户端如视频转换也有一个固定的yM/s的速度。网络请求也是有速度的,是一个连续的过程,所以fetch自然是一个流,速度是zM/s,我们最终看到视频的速度是min(x,y,z)。当然,如果服务器提前提供了readableStream,那么x的速度可以忽略不计,此时看视频的速度为min(y,z)。不仅视频如此,打开文件、打开网页等也是如此。浏览器处理html也是一个流过程:newResponse(stream,{headers:{'Content-Type':'text/html'},})if这个readableStream的controller.enqueue过程刻意处理得很慢,网页甚至可以逐字逐句呈现:Servingastring,slowlyDemo。流的场景虽然这么普遍,但是没必要把所有的代码都改成流处理,因为代码在内存中执行的很快,变量赋值也不一定要用流处理,但是如果这个变量的值来自打开的文件或网络请求,使用流进行最有效的处理。讨论地址为:Jingdu《web streams》·Issue#363·dt-fe/weekly想参与讨论的请点这里,每周都有新话题,周末或周一发布。前端精读——帮你过滤靠谱的内容。关注前端精读微信公众号版权声明:免费转载-非商业-非衍生保留属性(CreativeCommons3.0License)