前言说起“响应式编程”,你可能不陌生。不过,如果直接说出“流”这个名字,你可能会有点懵。其实,“流”的本质与“响应式编程”并无二致,两者都源于经典的前端设计模式——“观察者模式”。但是,在一定程度上,可以说“流”是基于这个模型的上层抽象,因为它的能力更多,功能更强大。在我的认知中,我把“流”分为“玄学”。在我们平时的开发中,最经典的使用“观察者模式”的方式就是“Vue”。其中,一方面,在“Vue”2x版本中,对应了一套基于Object.defineProperty()的依赖收集和分发更新。另一方面,“Vue”3.0版本对应了一套基于Proxy的依赖收集和分发更新。这里可以简单提一下“Vue”中的依赖收集和分发更新,对应2x中的watcher和dep。对应3.0中的effect和dep。那么,回到今天的话题,让我们来领略“流”的经典代表之一“RxJS”的魅力。WhatisRxJSOfficialDocumentation介绍:《RxJS》是一个使用Observables进行反应式编程的库,可以更轻松地编写异步或基于回调的代码。建议不了解ObservableAPI的同学可以移步学习https://github.com/tc39/proposal-observable,然后继续阅读本文。这样看,我们可以将“RxJS”与Promise进行比较,因为Promise的出现也是为了更容易编写异步或基于回调的代码。确实,很多说到“RxJS”的文章都会举例来比较两者的优缺点。因此,我这里不举例对比,只是简单罗列《RxJS》在使用Promise中解决的一些痛点,例如:控制异步任务的执行顺序。控制异步任务的启动和暂停。可以获得异步任务执行各个阶段的状态。监视异步任务执行期间发生的错误。在并发执行的情况下,一个异步任务出错不会影响其他异步代码的执行。RxJS的真面目(流)这里为什么要说“RxJS”的真面目呢?因为,上面提到的解决异步或者基于回调的代码繁琐问题,只是“RxJS”非常简单的一面,而它真正幻想的一面还在于它的“流”特性。相信大家在这之前应该都听说过“流”的概念,比如《Node》中的Stream流,《gulp》中的Stream流。那么,这里我们先来回顾一下这两个用到“流”的经典知识点。然后,一步步进入“RxJS”中“流”的世界。Node中的StreamNode中的Stream指的是一个“抽象接口”。比如文件读取,“HttpRequest”都实现了这个接口。此外,每个“流”都是EventEmitter的一个实例。EventEmitter的本质是一个简单的“观察者模式”,可以分发和监听事件。那么,我们通过一个简单的读取文件的例子来回忆一下“Node”中Stream的事件分发和监听:constfs=require("fs")letdata=''//创建一个文件读取流createReadStream('stream.txt','UTF8')//监听流数据读取事件readFileStream.on(data,(chunk)=>{data+=chunk})//监听流数据读取事件EventreadFileStream.on(end,(chunk)=>{console.log(`读取文件的结果为:${data}`)})//监听流读取文件异常事件readerStream.on('错误',(err)=>{console.error(err.stack)});这里我们可以看到,当文件读取流没有在监听数据的时候,是无法获取到我们需要的文件中的数据的。这对所有“流”都一样,需要订阅才能使用,因为“流”是惰性的。然后,我们再回忆一下“Node”中Stream的“管道流”概念,意思是“流”可以通过pipe链式调用,比如将文件读流直接写到文件写流中:constfs=require("fs")constreadFileStream=fs.createReadStream('readStream.txt','UTF8')constwriteFileStream=fs.createWriteStream('outStream.txt','UTF8')readFileStream.pipe(writeFileStream)一口吞下我不知道大家有没有注意到Stream,在“gulp”的官方文档中是这样介绍的:gulp.js是一个基于stream的自动化构建工具。而且,相信用过“gulp”的同学应该都知道,“流”是在“gulp”中以创建任务的形式使用的。具体的“流程”涉及到插件使用、文件处理和监控等细节。感兴趣的同学可以去gulp官方文档了解更多。比如我们要配置一个sass编译,会这样:constgulp=require('gulp')constsass=require('gulp-sass')//创建一个任务,scss目录的扩展是。sccss编译并输出到css文件gulp.task('sass-compile',function(){returngulp.src('scss/*.scss').pipe(sass({outputStyle:'expanded'}).on('error',sass.logError)).pipe(gulp.dest('css'))})//监听文件变化并执行sass编译任务gulp.watch('scss/*.scss',['sass-compile'])可见我们引入的gulp模块就是Stream本身。所以,正如“gulp官方文档”所说:gulp.js是一个基于流的自动化构建工具。嗯,回忆一下“Node”和“gulp”中的Stream。接下来,让我们一起走进《RxJS》中Stream的万千世界吧!RxJS中的Stream这里,我们通过《RxJS》中的几个关键词一步步认识它:1.ObservableObservable对象数据在Observable中流动,我们可以使用各种“运算符”Operators对流进行对流处理,例如过滤,重复数据删除。例如:importRxfrom'rxjs/Rx'constobservable=Rx.Observable.from([1,2,3])observable.filter(val=>val>=2).subscribe(val=>console.log(val))//232.Observer是一个“对象集合”,它存储用于监视可观察对象的回调。例如:importRxfrom'rxjs/Rx'constobservable=Rx.Observable.from([1,2,3])constobserver={next:val=>console.log(val),error:err=>console.error(err),complete:()=>console.log('completed')}observable.subscribe(observable)3.订阅订阅用于取消对可观察对象的订阅,例如在生命周期结束时一个组件应该退订。例如:importRxfrom'rxjs/Rx'import{Vue,Component}from'vue-property-decorator'@ComponentexportdefaultclassDialogextendsVue{privatesubscriptionprivatecreated(){constobservable=Rx.Observable.fromEvent(this.refs.btnCommit,'click')this.subscription=observable.subscribe({next:()=>{console.log('click')}})}privatedestroyed(){this.subscription.unsubscribe()}}4.Operatorsoperator指的是一些数组工具函数filter(),some(),map,flatMap等,这里就不举例了,和第一个例子一样使用filter()即可。Subject主体类似于上面提到的“Node”中的EventEmitter,分发事件,即广播。需要注意的是,Observable观察者的事件分发是单播的。例如:importRxfrom'rxjs/Rx'constsubject=newRx.Subject()subject.subscribe({next:val=>console.log(`Listeningobject1:${val}`)})subject.subscribe({next:val=>console.log(`monitoringobject2:${val}`)})subject.next(1)subject.next(2)/*output:listeningobject1:1listeningobject2:1listeningObject1:2Monitorobject2:2*/6.SchedulesScheduler负责调度任务,比如控制事件的执行顺序和排序等。它由这三部分组成:数据结构、任务的顺序存储。执行上下文,可以表示任务何时何地执行。一个(虚拟)时钟,任务的执行将遵循时钟设置。例如:importRxfrom'rxjs/Rx'constobservable=Rx.Observable.create(function(proxyOb){proxyOb.next(1)proxyOb.next(2)proxyOb.next(3)proxyOb.complete()}).observeOn(Rx.Scheduler.async)constobserver={next:x=>console.log(x),error:err=>console.error(err),完成:()=>console.log('completed'),};observable.subscribe(finalObserver)可以看到observer创建的时候调用了observerOn(),使用了scheduler的async。这里,所有订阅的next()会按照创建顺序将代码块放入消息队列中,以宏任务setTimeout或setInterval的形式执行,默认时钟设置为0。总结其实,介绍完这几个重点,我想大家应该明白上面说的解决Promise的痛点的由来了。而且,对于某些场景,《RxJS》官方文档也列出了如何使用,例如:全局状态存储的控制流结合第三方状态存储immutable结合React所以这里就不一一列举了,毕竟,文档已经说的很好了,就不用再抄了。感兴趣的同学可以自行学习。我写了最后一篇文章。其实一个月前,我就想总结分享一下,但各种因素差强人意。这段时间恰逢“端午节”,所以抽出几个小时来思考总结一下这段时间对“RxJS”和“流”的理解。当然,文章会有不足之处,欢迎大家提“Issue”。而且,我个人认为“流量”的存在很可能会改变未来的一些事情。比如携程前辈写的Model编程的前端架构方法中提到需要一个响应式的脱离平台的编程流程,比如“Redux”、“Vue3.0reactiveAPI”、“RxJS”等.有兴趣的同学可以继续了解这方面的内容。写作不易,但如果觉得有收获,可以来个帅气的三冲程!!!
