RxJS是一个响应式库,它接收从事件源发来的事件,经过处理管道的层层处理后传递给最终的接收者。管道由操作符组成。开发者只需要选择和组合运算符就可以完成各种异步逻辑,大大简化了异步编程。此外,RxJS的设计也遵循函数和流的概念。直接理解概念比较困难,所以先实现一个简单的RxJS再看这些。RxJS的使用RxJS会对事件源做一层封装,叫做Observable,将事件一个一个的发出去。例如:constsource=newObservable((observer)=>{leti=0;setInterval(()=>{observer.next(++i);},1000);});设置一个回调函数定时器不断的通过next传入事件。这些事件将由称为观察者的接收者监控。constsubscription=source.subscribe({next:(v)=>console.log(v),error:(err)=>console.error(err),complete:()=>console.log('complete'),});观察者可以接收next传递过来的事件。传输过程中可能会出现错误,这里也可以处理错误,也可以处理传输完成的事件。这种监控或订阅称为订阅。可以订阅,当然也可以取消订阅:subscription.unsubscribe();Observable返回取消订阅时的回调函数:constsource=newObservable((observer)=>{leti=0;consttimer=setInterval(()=>{observer.next(++i);},1000);返回函数unsubscribe(){clearInterval(timer);};});发送事件和监听事件只是基础,处理事件的过程才是RxJS的精髓精髓,它设计了pipeline的概念,可以用operator运算符组装:source.pipe(map((i)=>++i),map((i)=>i*10)).subscribe(()=>{//...})事件经过管道后会传递给Observer,会被传输过程中由运营商一一处理。例如这里的处理逻辑是将传入的数据加1,然后乘以10。综上所述,使用RxJS的代码是这样的:constsource=newObservable((observer)=>{leti=0;consttimer=setInterval(()=>{observer.next(++i);},1000);返回函数取消订阅(){clearInterval(timer);};});const订阅=来源。管道(地图((i)=>++i),地图((i)=>i*10))。subscribe({next:(v)=>console.log(v),error:(err)=>console.error(err),complete:()=>console.log('complete'),});setTimeout(()=>{subscription.unsubscribe();},4500);我们通过Observable创建了一个事件源,它每秒发出一个事件。这些事件将由管道处理,然后传递给观察者。pipeline由两个map操作符对数据做+1和*10处理。Observer接收传递的数据,打印它,并在最后处理错误和事件。另外,Observable提供了取消订阅时的处理逻辑。当我们在4.5s取消订阅时,定时器可以被清除。使用RxJS基本就是流程了,那么具体是怎么实现的呢?RxJS的80行代码实现从事件源开始,实现Observable:观察它的特点:它接收一个回调函数,可以调用next来传输数据。它有一个subscribe方法,可用于添加Observer的订阅,并返回订阅。可以在回调函数中返回unsbscribe处理逻辑。它有一个可以传递给操作员的管道方法。我们根据这些特点来实现一下:首先,Observable的构造函数需要接收回调函数_subscribe,但不是立即调用,而是在subscribe的时候调用:}subscribe(){this._subscribe();}}回调函数的参数是一个对象,有next、error、complete方法传递事件:classObservable{constructor(_subscribe){this._subscribe=_subscribe;}subscribe(observer){constsubscriber=newSubscriber(observer);this._subscribe(订户);}}classSubscriber{constructor(observer){super();this.observer=观察者;this.isStopped=false;}next(value){if(this.observer.next&&!this.isStopped){this.observer.next(value);}}错误(值){this.isStopped=true;如果(this.observer.error){this.observer.error(值);}}完成(){this.isStopped=true;如果(this.observer.complete){this.observer.complete();}if(this.unsubscribe){this.unsubscribe();这样,在回调函数中然后就可以调用next、error、complete方法了:另外,回调函数的返回值是unsbscribe的处理逻辑,需要在取消订阅时收集调用:classSubscription{constructor(){this._teardowns=[];}unsubscribe(){this._teardowns.forEach((teardown)=>{typeofteardown==='function'?teardown():teardown.unsubscribe()});}添加(拆解){如果(拆解){这个。_teardowns.push(拆解);}}}提供unsubscribe方法取消订阅,_teardowns用于收集所有取消订阅的回调,取消订阅时调用所有teardown回调这个逻辑比较通用,可以作为Subscriber的父类。然后,在Observable里调用add来添加拆解,并返回订阅(它有取消订阅方法):classObservable{constructor(_subscribe){this._subscribe=_subscribe;}subscribe(observer){constsubscriber=newSubscriber(observer);subscriber.add(this._subscribe(subscriber));返回订阅者;}}classSubscriberextendsSubscription{constructor(observer){super();this.observer=观察者;this.isStopped=false;}next(value){if(this.observer.next&&!this.isStopped){this.observer.next(value);}}错误(值){this.isStopped=true;如果(this.observer.error){this.observer.error(值);}}complete(){this.isStopped=true;如果(this.observer.complete){this.observer.complete();}if(this.unsubscribe){this.unsubscribe();}}}类下标ion{constructor(){this._teardowns=[];}unsubscribe(){this._teardowns.forEach((teardown)=>{typeofteardown==='function'?teardown():teardown.unsubscribe()});}添加(拆解){如果(拆解){this._teardowns.push(拆解);}}}这样,我们就实现了Observable和Observer,只写了50行代码。我们先测试一下:constsource=newObservable((observer)=>{leti=0;consttimer=setInterval(()=>{observer.next(++i);},1000);returnfunctionunsubscribe(){clearInterval(timer);};});constsubscription=source.subscribe({next:(v)=>console.log(v),error:(err)=>console.error(err),完成:()=>console.log('complete'),});setTimeout(()=>{subscription.unsubscribe();},4500);Observer监听了Observable传过来的1,2,3,4数据,因为订阅在4.5s取消了,所以后面就没有数据了。我们用50行实现了基本的RxJS!当然最本质的operator还没有实现,我们会继续完善。我们给Observable添加一个pipe方法,它会调用传入的operator,前一个的结果是下一个的输入,所以连接在一起,这就是pipeline的概念:classObservable{constructor(_subscribe){//..}subscribe(observer){//...}pipe(...operations){returnpipeFromArray(operations)(this);}}functionpipeFromArray(fns){if(fns.length===0){return(x)=>x;}if(fns.length===1){returnfns[0];}返回(输入)=>{返回fns。减少((上一个,fn)=>fn(上一个),输入);};}当传入0个参数时,直接返回之前的Observable,为1则直接返回,否则通过reduce串接形成管道。operator的实现是监听之前的Observable并返回一个新的Observable。比如map的实现就是传入project处理value,将结果传递给next:.subscribe({next(value){returnsubscriber.next(project(value));},error(err){subscriber.error(err);},complete(){subscriber.complete();},});returnsubscription;});}这样,我们就实现了operator,我们来测试一下:我们调用pipe方法,使用两个mapoperator来组织处理流程,对数据进行+1和*10处理。因此,当Observable传递的1、2、3、4传递给Observer时,就变成了20、30、40、50。至此,我们实现了Observable、Observer、Subscription、运算符,它是RxJS的简化版本。仅使用了80行代码。我们再看看最初的想法:为什么叫响应式?因为它监听事件源并进行一系列的处理,所以这种编程模式被称为响应式。为什么叫函数式呢?因为算子的每一步都是一个纯函数,返回一个新的Observable,符合函数的不可变性,修改后返回一个新的概念。为什么叫流量呢?因为每个事件都是动态生成和传输的,所以这种数据的动态生成和传输可以称为流。完整代码如下:functionpipeFromArray(fns){if(fns.length===0){return(x)=>x;}if(fns.length===1){returnfns[0];}return(input)=>{returnfns.reduce((prev,fn)=>fn(prev),input);};}classSubscription{constructor(){this._teardowns=[];}unsubscribe(){this._teardowns.forEach((teardown)=>{typeofteardown==='function'?teardown():teardown.unsubscribe()});}添加(拆解){如果(拆解){this._teardowns.push(拆解);}}}classSubscriberextendsSubscription{constructor(observer){super();this.observer=观察者;this.isStopped=false;}next(value){if(this.observer.next&&!this.isStopped){this.observer.next(value);}}错误(值){this.isStopped=true;如果(this.observer.error){this.observer.error(值);}}complete(){this.isStopped=true;如果(this.observer.complete){this.observer.complete();}if(this.unsubscribe){this.unsubscribe();}}}classObservable{constructor(_subscribe){this._subscribe=_subscribe;}subscribe(observer){constsubscriber=newSubscriber(observer);subscriber.add(this._subscribe(subscriber));返回订阅者;}pipe(...operations){returnpipeFromArray(operations)(this);}}functionmap(project){return(observable)=>newObservable((subscriber)=>{constsubcription=observable.subscribe({next(value){returnsubscriber.next(project(value)));},错误(err){subscriber.error(err);},complete(){subscriber.complete();},});返回订阅;});}constsource=newObservable((observer)=>{leti=0;consttimer=setInterval(()=>{observer.next(++i);},1000);返回函数unsubscribe(){clearInterval(计时器);};});constsubscription=source.pipe(map((i)=>++i),map((i)=>i*10)).subscribe({next:(v)=>console.log(v),错误:(err)=>console.error(err),complete:()=>console.log('complete'),});setTimeout(()=>{subscription.unsubscribe();},4500);小结为了理解RxJS的响应式、函数式、流式等概念,我们实现了一个简单版的RxJS。我们实现了Observable、Observer、Subscription等概念,完成了Event的生成和订阅、取消订阅。然后实现operator和pipe,每个operator返回一个新的Observable,对数据进行逐层处理。写完之后,我们可以更清楚地理解响应式、功能性、流程等概念在RxJS中是如何体现的。只需要80行代码就可以实现一个简单版本的RxJS。
