当前位置: 首页 > 科技观察

RxJava算子系列三(上)

时间:2023-03-16 01:13:51 科技观察

RxJava算子系列传送门RxJava算子源码https://github.com/xiehui999/fuseProgramRxJava算子系列一RxJava算子系列二前言在上一篇文章中,我们介绍了一些Observable的创建和数据引入了转换运算符。一些数据转换操作符还是比较难理解的,但是相信如果敲代码,修改各种参数的值观察执行日志,相信还是很容易理解的。在官方网站上,为每个运营商提供了图例。如果你对文字的理解不够清晰,也可以参考插图来帮助理解。在本文中,我们将介绍一些常用的过滤运算符。RxJava中的过滤操作符也比较容易理解。好吧,让我们一起继续学习之旅吧。Filter这个操作符接收一个Func1参数,在这个参数中我们可以根据自己的判断条件来判断我们要过滤的数据。当数据通过判断条件时,返回true表示数据发出,否则不发出,所以会过滤掉我们想要的数据。如下,我们过滤掉不能被2整除的数。Integer[]ints={1,2,3,4,5,6,7,8,9};Observableobservable=Observable.from(ints).filter(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger%2!=0;//返回true,不会被过滤掉,会发出数据,过滤掉返回false的值}});Action1action1=newAction1(){@Overridepublicvoidcall(Integeri){Log.e(TAG,"call:"+i);}};observable.subscribe(action1);输出日志信息call:1call:3call:5call:7call:9ofType运算符是过滤运算符的一种特殊形式。它过滤一个Observable,只返回指定类型的数据。比如当数据源有string和int数据时,我们可以使用这个操作符过滤掉字符串。下面的示例代码是Observable.just(0,"one",6,4,"two",8,"three",1,"four",0).ofType(String.class).subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:ofType");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:ofType");}@OverridepublicvoidonNext(Stringstring){Log.e(TAG,"onNext:ofType"+string);}});输出日志信息onNext:ofTypeoneonNext:ofTypetwoonNext:ofTypethreeonNext:ofTypefouronCompleted:ofType当然除了过滤基本类型的数据外,还可以过滤自定义类型的数据。First如果我们只对Observable发出的第一项数据感兴趣,或者满足某个条件的第一项数据,我们可以使用First运算符。Observable.just(10,11,12,13)??.first().subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,integer+"");}});上面的日志只打印了一个值10。当然我们也可以给first传一个参数Fun1,指定一个条件如下Observable.just(10,11,12,13)??.first(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>12;}}).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,integer+"");}});此时输出的信息是满足integer>12data13的***项。firstOrDefault此运算符是第一个运算符的变体。主要是在没有数据发出时发出一个你在参数中指定的默认值。如下,它有两个重载方法。Observable.just(11,12,13)??.firstOrDefault(10).subscribe(newAction1(){@Overridepublicvoidcall(Objecto){Log.e(TAG,o.toString());}});如果写成上面的代码,这次执行的效果和第一次是一样的。因为没有数据发射时使用默认值,所以我们将上面的代码改成如下,使用empty创建一个不发射任何数据但正常终止的Observable。Observable.empty().firstOrDefault(10).subscribe(newAction1(){@Overridepublicvoidcall(Objecto){Log.e(TAG,o.toString());}});发现此时输出了数据10。这个算子还提供了一个重载方法firstOrDefault(TdefaultValue,Func1superT,Boolean>predicate),有两个参数。我们可以添加一个条件。下面的例子是Observable.just(10,13,16).firstOrDefault(15,newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>20;}}).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,""+integer);}});此时数据源10、13、16不满足大于20,此时会输出默认值15。如果我们设置数据源的数据增加了一个值22。那么此时不再输出默认值,而是输出22。takeFirst操作符和first操作符的区别在于,如果原始Observable没有发出任何满足条件的数据,first将抛出NoSuchElementException并直接执行onError(),而takeFist将返回一个空的Observable(不调用onNext()但将调用onCompleted)如下示例代码所示Observable.just(10,11).filter(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>20;}}).first().subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:"+e.toString());}@OverridepublicvoidonNext(Objecto){Log.e(TAG,"onNext:"+o.toString());}});执行后的输出信息如下,11).takeFirst(newFunc1(){@OverridepublicBooleancall(Integerinteger){Log.e(TAG,"call:takeFirst");returninteger>30;}})。subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted:");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError:"+e.toString());}@OverridepublicvoidonNext(Objecto){Log.e(TAG,"onNext:"+o.toString());}});发现此时不会有异常,但是执行了onCompleted()single。如果原始Observable在完成之前没有恰好发射一次数据,它将抛出NoSuchElementException。大白话可以理解为发送的数据是一个item,然后输出这个item的值,如果有多个数据,会抛出异常并执行onError()方法。下面的代码Observable.just(10,11,12,13)??.single().subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError"+e.toString());}@OverridepublicvoidonNext(Integerinteger){Log.e(TAG,integer);}});输出信息onError:java.util.NoSuchElementException:Sequencecontainsnoelementsif对上面的代码做简单的修改Observable.just(10,11,12,13)??.filter(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>12;}}).subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){Log.e(TAG,"onCompleted");}@OverridepublicvoidonError(Throwablee){Log.e(TAG,"onError"+e.toString());}@OverridepublicvoidonNext(Integerinteger){Log.e(TAG,integer);}});此时会输出数据13,因为经过过滤后只有一条数据。single还有两个变体,singleOrDefault(T)和singleOrDefault(T,Func1),可以通过测试自己的代码来区分。last运算符与first的含义相反。如果我们只对Observable发射的最后一项数据感兴趣,或者满足特定条件的最后一项数据,请使用此运算符。示例代码Observable.just(10,11,12,13)??.last().subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,"call:"+integer);}});执行后输出13。它有一个重载方法,可以指定条件并获取满足条件的最后一项数据。修改以上代码如下Observable.just(10,11,12,13)??.last(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger<12;}}).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,"call:"+integer);}});此时最终输出的数据为11。这个算子和first一样有几个变体,比如lastOrDefault、TakeLast,具体效果可以自己测试。Skip该操作符是在传输数据之前跳过前几项数据。Observable.range(1,10).skip(6).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){Log.e(TAG,"call:"+integer);}});输出日志信息call:7call:8call:9call:10skip有两个重载方法,skip(longtime,TimeUnitunit)默认在计算调度器上执行。如果要更新UI操作,需要指定为AndroidSchedulers.mainThread(),当然还有重载方法skip(longtime,TimeUnitunit,Schedulerscheduler)可以指定调度器。需要注意的是,这两个重载方法的第一个参数不是跳过数据的个数,而是时间。Observable.interval(500,TimeUnit.MILLISECONDS).skip(2,TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(newSubscriber(){@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwablee){}@OverridepublicvoidonNext(LongaLong){tv.append("\n"+aLong);if(aLong>10){this.unsubscribe();}}});如上面代码,通过interval每500毫秒产生一条数据,使用skip设置跳过时间为2秒。并在数据大于10时取消订阅。skipLast与skip正好相反,忽略***Observable.range(1,10).skipLast(6).subscribe(newAction1())产生的n个数据项{@Overridepublicvoidcall(Integerinteger){Log.e(TAG,"call:"+integer);}});输出日志信息call:1call:2call:3call:4下面继续