连接上面的输出日志信息call:2ConcatMapRxNewThreadScheduler-5onNext:ConcatMap101ConcatMapcall:2ConcatMapRxNewThreadScheduler-6onNext:ConcatMap102ConcatMapcall:2ConcatMapRxNewThreadScheduler-7onNext:ConcatMap103ConcatMaponCompleted:这个log信息很通过concatfMaplat输出很容易看出,flatMap不保证数据源的顺序,而ConcatMap算子保证了数据源的顺序。在应用中,如果对数据的顺序有要求,就需要使用ConcatMap。如果不需要,两者都可以使用。SwitchMap当原来的Observable发出一个新的数据(Observable)时,它会取消订阅并停止监听产生之前数据的Observable,只监听当前的Observable。Integer[]integers={1,2,3};Observable。from(integers).switchMap(newFunc1>(){@OverridepublicObservablecall(Integerinteger){Log.e(TAG,"call:SwitchMap"+Thread.currentThread().getName());//如果不是通过subscribeOn(Schedulers.newThread())是模拟子线程中的并发操作,所有的数据源还是会输出,即这个运算符只对并发操作有效//这里如果通过Thread.sleep()设置等待时间,则输出信息不会相同。等价于模拟并发度@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:SwitchMap");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:SwitchMap");}@OverridepublicvoidonNext(字符串){Log.e(TAG,"onNext:SwitchMap"+s);}});输出日志信息call:SwitchMapmaincall:SwitchMapmaincall:SwitchMapmainonNext:SwitchMap106SwitchMaponCompleted:SwitchMap当数据源很多的时候,不一定只输出最后一项数据,可能输出几项数据,也可能输出全部。看到GroupBy这个词,你应该会想到这个运算符的作用,也就是你理解的意思。它根据您的协议对数据源进行分组。我们通过groupBy将数据从1分到10,代码如下newSubscriber>(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:1");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:1");}@OverridepublicvoidonNext(GroupedObservablebooleanIntegerGroupedObservable){booleanIntegerGroupedObservable.toList().subscribe(newSubscriber>(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:2");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:2");}@OverridepublicvoidonNext(Listintegers){Log.e(TAG,"onNext:2"+integers);}});}});输出日志信息onNext:2[1,3,5,7,9]onCompleted:2onNext:2[2,4,6,8,10]onCompleted:2onCompleted:1上面代码中booleanIntegerGroupedObservable变量有一个getKey()方法,返回group的key,其值是groupBy方法调用callback函数的值,上面是整数%2==0的值,true和false。几个组也由这个值决定。Scan运算符将一个函数应用于原始Observable发出的第一项,然后将该函数的结果作为它自己的第一项发出。它用函数的结果和第二个数据填充函数以生成它自己的第二个数据。它继续这个过程以生成剩余的数据序列。比如计算1+2+3+4的和Observable.range(1,4).scan(newFunc2(){@OverridepublicIntegercall(Integerinteger,Integerinteger2){Log.e(TAG,"call:integer:"+integer+"integer2"+integer2);returninteger+integer2;}}).subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:");}@OverridepublicvoidonNext(Integerinteger){Log.e(TAG,"onNext:"+整数);}});输出日志信息onNext:1call:integer:1integer22onNext:3call:integer:3integer23onNext:6call:integer:6integer24onNext:10onCompleted:scan有一个重载的方法,可以设置一个初始值,比如上面的代码,初始值为设置为10,只需要加一个参数scan(10,newFunc2)进行扫描即可。Buffer运算符将一个Observable转换为另一个。原始Observable正常发射数据,而生成的Observable发射这些数据的缓存集合。如果原始Observable发出onError通知,Buffer将立即传递通知而不是先发出通知。缓存的数据,即使缓存之前包含原始Observable发出的数据。示例代码Observable.range(10,6).buffer(2).subscribe(newSubscriber>(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:");}@OverridepublicvoidonNext(Listintegers){Log.e(TAG,"onNext:"+integers);}});输出日志信息onNext:[10,11]onNext:[12,13]onNext:[14,15]onCompleted:上面一次订阅了两条数据。如果参数设置为6,则一次性订阅。buffer的另一种重载方法buffer(count,skip)从原来的Observable的第一项数据创建一个新的缓存(长度count),然后每当接收到skip项数据时用count项数据填充缓存:在开始一项和后续的count-1项,以列表(List)的形式发出缓冲区,根据count和skip的值,这些缓冲区可能有重叠的部分(比如skipcount)。对于具体的执行结果,可以设置不同的skip和counts来观察输出日志,查看执行结果和过程。WindowWindow类似于Buffer,但它不是从原始Observable发出数据包,而是发出Observable,每个Observable发出原始Observable数据的子集,最后发出onCompleted通知。Observable.range(10,6).window(2).subscribe(newSubscriber>(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted1:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError1:");}@OverridepublicvoidonNext(ObservableintegerObservable){Log.e(TAG,"onNext1:");tv1.append("\n");integerObservable.subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted2:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError2:");}@OverridepublicvoidonNext(Integerinteger){Log.e(TAG,"onNext2:"+整数);}});}});输出日志信息onNext2:10onNext2:11onCompleted2:onNext2:12onNext2:13onCompleted2:onNext2:14onNext2:15onCompleted2:onCompleted1:window也有buffer等不同的重载方式。相对于其他算子,这两个算子不太好理解,可以去RxJavaGitHub了解,里面有图解。当然,最好的理解方式是通过改变变量的值来观察输出的日志信息。好了,这篇文章就介绍到这里了。文中如有错误,请指正。谢谢。
