当前位置: 首页 > 科技观察

运算符在RxJava中究竟做了什么?

时间:2023-03-22 01:10:22 科技观察

RxJava今年彻底火了,最厉害的还是算子。我以前只知道怎么用。这几天看了源码,大致摸清了operator的工作流程。今天,我将与您分享。每个人。如有不妥之处,请指教。今天我们以过滤器为例,看代码:Integer[]datas={1,2,3,4,5,6,7,8,9,10};Observable.from(datas).filter(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>=5;}}).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){mText。append(integer.toString()+",");}});一个很简单的例子,使用filter运算符filter查找大于等于5的数字。我们点进去看看源码中filter做了什么(newOnSubscribeFilter(this,predicate));}调用create()方法等,我们什么时候使用create()方法的?我们在创建Observable时也使用了create()方法。我们创建了一个新的Observable并返回了它。是不是意味着我们的订阅者其实订阅了这个新的Observable,我们继续看create方法。create方法需要的参数是一个OnSubscribe对象。那么我们可以确认OnSubscribeFilter是OnSubscribe的一个实现类。让我们点进去看看。publicfinalclassOnSubscribeFilterimplementsOnSubscribe{finalObservablesource;finalFunc1predicate;publicOnSubscribeFilter(Observablesource,Func1predicate){this.source=source;this.predicate=predicate;}果然如我们所料,OnSubscribeFilter就是OnSubscribe的实现类。我们看他的构造方法,传递两个参数。第一个参数是一个Observable对象和一个Func1。第一个参数是我们自己创建的Observable,第二个参数是我们在外面写的Func1,然后保存。我们都知道订阅subscribe()时OnSubscribe的call()方法。看看OnSubscribeFilter的call()方法做了什么@Overridepublicvoidcall(finalSubscriberchild){FilterSubscriberparent=newFilterSubscriber(child,predicate);child.add(parent);source。unsafeSubscribe(parent);}有个FilterSubscriber,什么鬼,看看他是什么}@OverridepublicvoidonError(Throwablee){if(done){RxJavaHooks.onError(e);return;}done=true;actual.onError(e);}@OverridepublicvoidonCompleted(){if(done){return;}actual.onCompleted();}@OverridepublicvoidsetProducer(Producerp){super.setProducer(p);actual.setProducer(p);}}订阅者的子类,我们看一下它的构造方法,两个参数,一个Subscriber,一个Func1。当我们创建一个对象时,Subscriber对象就是我们从外界传入的真实观察者。Func1允许我们在创建OnSubscribeFilter对象时传入,也就是我们在外面定义的Func1。回过头来,我们继续看OnSubscribeFilter的调用方法。我们看到source.unsafeSubscribe(parent),source就是我们原来的外部Observable,它订阅了FilterSubscriber对象。我们在他的onNext方法中看到他是根据func1.call(t)的返回值来判断是否让我们真正的外部观察者调用onNext方法。看到这里,你是不是恍然大悟,什么?我什至不知道你在说什么,呃,然后我们作为一个整体反复。我们的外部代码,subscribe()的时候,Subscriber没有订阅我们自己写的Observable,Subscriber订阅的是filter方法返回的新的Observable对象,所以订阅的时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter就是我们的onSubscribe订阅的观察者对象,在OnSubscribeFilter的call()方法中,我们让我们包装好的FilterSubscriber订阅我们原来的观察者,也就是我们在外面生成的Observable。我们在外部Observable的onSubscribe对象的call方法中获取到的观察者就是FilterSubscriber对象,我们调用的onNext会回调到FilterSubscriber的onNext方法。在FilterSubscriber的onNext方法中,我们根据传入的Func1判断是否回调真实Subscriber的onNext方法。当为真时,我们回调我们外部观察者的onNext方法,也起到了过滤的作用。这就是Filter的全过程。让我们测试一下我们的小结论:());subscriber.onNext(5);}}).filter(newFunc1(){@OverridepublicBooleancall(Integerinteger){returninteger>0;}}).subscribe(newAction1(){@Overridepublicvoidcall(Integerinteger){}});