当前位置: 首页 > 科技观察

理解RxJava线程模型

时间:2023-03-21 11:32:12 科技观察

RxJava是目前超火的框架,便捷的线程切换一直被人们津津乐道。本文从源码的角度深入理解RxJava的线程模型。(注:本文中很多代码并不是RxJava的原始源码,而是用来说明逻辑的伪代码。)在RxJava中体验切换线程非常简单,比如最常见的异步线程处理,主要线程回调模型,可以非常优雅的使用如下代码进行处理:)).subscribe(obj->System.out.print(String.valueOf(obj)));如上,subscribeOn(Schedulers.io())保证doExpensiveWork函数发生在io线程,observeOn(AndroidSchedulers.mainThread())保证subscribe回调发生在Android主线程。因此,这自然引出了本文的重点。subscribeOn和observeOn有什么区别?流程分析要回答上面的问题,首先我们需要对RxJava的流程有个大概的了解,从一个Observable的生成到最后执行subscribe,中间可以经历n次变换,每次变换都会生成一个新的Observable,就像奥运会开幕式的火炬传递一样,每次都会将火炬传递给下一个人,最后一个火炬手由最后一个火炬手点燃,也就是最后的订阅操作是最后一个Observable,所以有必须是每个Observable之间的连接。这种关系体现在代码中,每一个转换后的Observable都会持有前一个Observable中的OnSubscribe对象的引用(Observable.create函数需要的参数),最后Observable的subscribe函数中的关键代码就是这句话:observable.onSubscribe.call(subscriber)这个observable是最后一个转化的observable,那么onSubscribe对象是谁呢?一个observable怎么能不经过任何转换就直接执行subscribe,当然是我们在create中传入的onSubscribe,但是如果是通过map,reduce等进行转换,这个onSubscribe显然应该是从转换后的observable中传入的参数,大部分的转换最终都交给了lift函数:);}所以上面说的onSubscribe对象应该是OnSubscribeLift的一个实例,而这个OnSubscribeLift接收的两个参数,一个是上面说的前面Observable中的OnSubscribe对象,operator是每个transformation的一个抽象接口。我们看一下这个OnSubscribeLift对象的调用方法:publicvoidcall(Subscribero){Subscriberst=operator.call(o);parent.call(st);}operator和parent就是这两个上面提到的参数,可以看出operator接口会有一个call方法,接收一个Subscriber,返回一个新的Subscriber对象,接下来的parent.call(st)就是回调onSubscribe的call方法上可观察的,等等。直到订阅截止日期。这样我们首先明确了一条线,就是安装了一个observablesubscribe之后,OnSubscribe的调用顺序是从后往前。这就引出了另一个问题。从上面的代码我们可以看出在执行parent.call(st)之前已经执行了operator.call(o)方法。如果在call方法中执行转换操作,好像转换也会从后面传到前面?所以这个operator.call方法肯定没有我们想象的那么简单。这里以map算子为例,见源码:publicSubscribercall(finalSubscribers){MapSubscriberparent=newMapSubscriber(o,transformer);o.add(parent);returnparent;}这里并没有进行转换操作,而是生成了一个MapSubscriber对象。这里需要注意MapSubscriber构造函数的两个参数。transformer是实际需要转换的Func1对象。哪个是订阅者?这是什么意思?例如🌰:o1->o2->subscribe(Subscribers0);o1经过map操作后变成o2,o2执行subscribe操作。如果理解了上面的内容,就可以知道,这个段过程的执行顺序是,s0会先传递给o2,o2的lift操作会把s0转化为s1再传递给o1。那么,在生成o2的map操作的call(finalSubscribers)方法中,s值给谁呢?是s0还是s1?答案应该是s0,也就是它的下一级Subscriber。原因很简单。call方法返回的MapSubscriber对象的父对象是s1。那么,我们来看看MapSubscriber的onNext方法做了什么?publicvoidonNext(Tt){Rresult;result=transformer.call(t);s.onNext(result);}很清楚,先进行转换,再回调下一级的onNext函数。至此,我们对一个Observable从初始化,到改造,到订阅的全过程有了一个大致的了解。简单来说就是一个o1通过一个map变成o2。可以理解为o2在o1上做了一层hook,会经过两个过程。首先,onSubscribe对象的调用流程会从o2流向o1。我们简称为流程a。到达o1后,o1会启动Subscriber的onNext系列进程,简称进程b,进程b是真正执行转换的进程,它的方向是从o1到o2。明白了这一点,我们就可以更进一步的理解RxJava中的线程模型了。tip:一定要深刻理解进程a和进程b的区别。这对于理解下面的线程切换至关重要。切换方法RxJava对线程模型的抽象是Scheduler,它是一个抽象类,包含一个抽象方法:publicabstractWorkercreateWorker();这个工人在哪里神圣?它实际上是Scheduler的一个抽象内部类,主要包含两个抽象方法:1)publicabstractSubscriptionschedule(Action0action);2)publicabstractSubscriptionschedule(finalAction0action,finallongdelayTime,finalTimeUnitunit);可见Worker是线程执行的主力军。两种方法之一用于立即执行任务,另一种用于执行延迟任务。Scheduler是Worker的工厂,用来对外提供Worker。RxJava中常用的线程切换方式有两种,分别是subscribeOn转换和observeOn转换,两者都接收Scheduler的参数。接下来从源码层面比较两者的区别。subscribeOn先看subscribeOn的部分publicfinalObservablesubscribeOn(Schedulerscheduler){returncreate(newOperatorSubscribeOn(this,scheduler));}新建一个Observable,传入的参数是OperatorSubscribeOn,显然this应该是其中一个OnSubscribe的实现,注意这个OperatorSubscribeOn调用的实现方法:publicvoidcall(finalSubscribersubscriber){finalWorkerinner=scheduler.createWorker();inner.schedule(newAction0(){@Overridepublicvoidcall(){finalThreadt=Thread.currentThread();Subscribers=newSubscriber(subscriber){@OverridepublicvoidonNext(Tt){subscriber.onNext(t);}...};source.unsafeSubscribe(s);}});}这是更多critical,上去文章中提到了进程a和进程b。首先明确这个调用方法的执行时机是进程a,也就是说这次调用发生在进程b之前。在调用方法中,首先通过外部传入的调度器创建Worker-inner对象。然后在里面执行了一段代码,神奇的是,Action0中的call方法的代码是在工作线程中执行的,也就是此时切换了进程。注意代码source.unsafeSubscribe(s)的最后一句,source表示创建OperatorSubscribeOn对象是最后传入的Observable,这句源码如下:publicfinalSubscriptionunsafeSubscribe(Subscribersubscriber){returnSubscribe.call(subscriber);}类似于lift方法中上面提到的OnSubscribeLift对象的call方法中的parent.call(st),就是通过onSubscribe将当前Observable与上一个Observable关联起来。至此,我们可以大致理解subscribeOn的原理,它会在进程a中进行线程切换,但是因为进程a实际上是Observable之间串行关系的代码,它是从后面的Observable流向前面的Observable,隐含的意思其中就是,对于进程b,最早的subscribeOn会阻塞后面的subscribeOn!例如:Observable.just("magic").map(file->doExpensiveWork(file)).subscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(obj->doAction(obj)));无论是这段代码中的doExpensiveWork函数还是doAction函数,都会在io线程上触发。observeOn理解subscribeOn,这样会更容易理解observeOn。observeOn函数最终会转化为这个函数:publicfinalObservableobserveOn(Schedulerscheduler,booleandelayError,intbufferSize){returnlift(newOperatorObserveOn(scheduler,delayError,bufferSize));}很明显,这是一个提升操作。我们需要关注OperatorObserveOn,查看其调用方式:publicSubscribercall(Subscriberchild){ObserveOnSubscriberparent=newObserveOnSubscriber(scheduler,child,delayError,bufferSize);parent.init();returnparent;}这里返回的是一个ObserveOnSubscriber对象,我们注意这个Subscriber的onNext函数,publicvoidonNext(finalTt){schedule();}它只是简单的执行了schedule函数之后,我们来看看这个时间表:protectedvoidschedule(){recursiveScheduler.schedule(this);}这里的recursiveScheduler.schedule是什么?不神奇,是ObserveOnSubscriber构造函数传入的scheduler创建的worker:this.recursiveScheduler=scheduler.createWorker();于是又产生了magic,observeOn在其onNext中进行线程切换。这个onNext什么时候执行?从上面可以看出是在进程b中。所以observeOn会影响它后面的进程,直到下一个observeOn出现或者结束。外围技巧线程模型选择RxJava为我们内置了几种线程模型,主要区别如下:计算里面是一个线程,线程池的大小和cpu核数:),这种线程比较适合Purecpu的操作,比如求100亿以内的斐波那契数之和。newThread每次创建createWorker时都会生成一个新线程。io类似于newThread,但是里面是一个不在线的线程池。一般来说,使用io优于newThread,因为它内部的线程池可以复用线程。Immediate在当前线程中立即执行。Trampoline在当前线程中执行。与immediate不同的是,它不会立即执行,而是存放在队列中,等待当前Scheduler中的其他任务执行完毕。这个我们经常用的不多,主要服务于repeat、retry等特殊的转换操作。from接收一个Executor,允许我们自定义Scheduler。Scheduler.Worker抢了风头。其实RxJava中的Worker是可以抽出来自己用的。如下写法所示,一个新开的线程执行一个action。Scheduler.Workerworker=Schedulers.newThread().createWorker();worker.schedule(newAction0(){@Overridepublicvoidcall(){thrownewRuntimeException("惊喜");}});当然,你要选择合适的时间关闭(unsubscribe)worker来释放资源。带有自己光环的运算符一些运算符有一个默认的线程模型。比如上面提到的repeat和retry默认会在trampoline线程模型下执行,buffer和debounce默认会切换到computation。这里我就不深入讨论了,大家记住,有些角色在遇到某些问题的时候,是有自己的装备和光环的。综上所述,理解RxJava的线程模型最重要的是要区别于我们通常对异步的理解:doAsync("magic",newCallback(){@Overridepublicvoidhandle(Objectmsg){a....}});b)....这是我们之前经常写的代码。通常只区分UI线程和非UI线程。doAsync函数启动后,程序被分割,一个线程在执行一个doAsync,另一个线程在执行b段代码。RxJava采用了一种新的方法并将整个线程抽象化。RxJava的处理顺序就像一个流。这不仅体现在写成链状的代码上,也体现在逻辑上。对于Observable本身,改变线程只是改变流的流动。轨道不用于调车。Android中常用非UI线程来处理数据,UI线程显示数据只是这种管道转换的一种方式。就我个人的理解,将RxJava的线程切换理解为异步、非异步、阻塞、非阻塞是不合适的。暂时只能理解为改造。如此惊人!