看这个最简单的代码:import{Observable,of}from'rxjs';constobservable=of(1,2,3);observable.subscribe((message)=>{console.log(消息);});输出:将输入的1、2、3当成一个数组,触发fromArray函数调用:因为没有调度器调用,所以进入subscribeToArray分支:subscribeToArray的实现是在一个subscribeToArray的.ts文件中,命名空间这个文件的内容如下:internal/util注意,上图中第8行的for循环显然不会在of函数调用中执行,而只会在of返回的Observable被订阅时执行。subscribeToArray的逻辑是一个简单的for循环,在循环体内依次调用订阅者的next方法,最后调用complete方法。subscribeToArray返回的函数存储在Observable构造函数的_subscribe属性中。然后在this的返回的Observable实例上调用subscribe方法。Observable实例的_subscribe方法指向subscribeToArray刚刚返回的函数:这??个函数体直到Observable被订阅时才会执行。在函数体的for循环中,一个一个调用订阅者的next方法:订阅者不是应用开发者创建的,而是rxjs框架在内部维护和使用的。Subscriber有一个属性destination,指向Safesubscriber,这个safesubscriber的_next属性指向应用开发者维护的回调函数。该函数的执行顺序如下图所示:subscribe不仅传入单个回调函数,还支持error和complete处理。请参见以下示例:从'rxjs'导入{Observable,of};constobservable=of(1,2,3);observable.subscribe((message)=>{console.log(message);},()=>{},()=>{console.log('complete');});complete方法的执行逻辑与next方法类似,唯一不同的是它是在for循环体执行完成后触发:可见Observable的创建者是冷Observable。如果是周期性发射数据的Observable,我们也可以使用unsubscribe来取消订阅。看下面的代码:import{interval}from'rxjs';constobservable=interval(1000);constsubscription=observable.subscribe((x)=>console.log(x));setTimeout(()=>{订阅.unsubscribe();},4500);输出:Observable在输出4个整数后停止发射值。
