当前位置: 首页 > 后端技术 > Java

RxJava2看这篇文章就够了

时间:2023-04-01 23:03:33 Java

0。简介RxJava其实提供了一套异步编程的API,这套API是基于观察者模式的,并且是链式调用的,所以用RxJava写的代码逻辑会非常简洁。RxJava有以下三个基本元素:观察者(Observable)观察者(Observer)订阅(subscribe)下面说说以上三者是如何配合的:首先在gradle文件中添加依赖:implementation'io.reactivex.rxjava2:rxjava:2.1.4'实现'io.reactivex.rxjava2:rxandroid:2.0.2'创建一个可观察对象:>e)抛出异常{Log.d(TAG,"===========================currentThread名称:"+Thread.currentThread().getName());e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();}});创建观察者:Observerobserver=newObserver(){@OverridepublicvoidonSubscribe(Disposabled){Log.d(TAG,"========================订阅”);}@OverridepublicvoidonNext(Integerinteger){Log.d(TAG,"========================onNext"+integer);}@OverridepublicvoidonError(Throwablee){Log.d(TAG,"======================onError");}@OverridepublicvoidonComplete(){Log.d(TAG,"======================onComplete");}};observable.subscribe(observer);复制代码这里其实也可以使用链接方式调用:Observable.create(newObservableOnSubscribe(){@Overridepublicvoidsubscribe(ObservableEmittere)throwsException{Log.d(TAG,"=========================当前线程名称:"+Thread.currentThread().getName());e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();}}).subscribe(newObserver(){@OverridepublicvoidonSubscribe(Disposabled){Log.d(TAG,"======================onSubscribe");}@OverridepublicvoidonNext(Integerinteger){Log.d(TAG,"======================onNext"+integer);}@OverridepublicvoidonError(Throwablee){Log.d(TAG,"=======================onError");}@OverridepublicvoidonComplete(){Log.d(TAG,"=======================onComplete");}});观察者复制代码发送的事件如下,总结如下表:事件类型函数onNext()发送的事件发生时,观察者会回调onNext()方法onError()。当事件发送时,观察者会回调onError()方法。事件发送后,其他事件将不再继续发送。onComplete()事件发送时,observe后者会回调onComplete()方法。该事件发送后,其他事件将不再继续发送。实际上,RxJava可以比作果汁机。家里的水果种类繁多(待发原始数据),你想榨点果汁喝,那你得想想你要喝什么果汁?如果你想喝鳄梨、梨和柠檬汁,那么你就得把这三种水果混合在一起榨汁(用各种算子把你要发送给观察者的数据进行转换),榨完之后你就可以喝你想喝的了我想要的果汁(将处理后的数据发送给观察者)总结如下:2018/5/26/1639a8ee56b13c41~tplv-t2oaga2asx-zoom-in-crop-mark:3024:0:0:0.awebp)下面解释一下RxJava的各种常用算子。1.Creationoperator以下是对创建observed的各种operator进行说明。1.1create()方法预览:publicstaticObservablecreate(ObservableOnSubscribesource)复制代码有什么用:如何创建一个observable:Observableobservable=Observable.create(newObservableOnSubscribe(){@Overridepublicvoidsubscribe(ObservableEmittere)throwsException{e.onNext("HelloObserver");e.onComplete();}});上面的代码很简单,创建了ObservableOnSubscribe并重写了它的subscribe方法,可以通过ObservableEmitter发射器向观察者发送事件。下面创建一个观察者来验证被观察对象是否创建成功。Observerobserver=newObserver(){@OverridepublicvoidonSubscribe(Disposabled){}@OverridepublicvoidonNext(Strings){Log.d("chan","=============onNext"+s);}@OverridepublicvoidonError(Throwablee){}@OverridepublicvoidonComplete(){Log.d("chan","=============onComplete");}};observable.subscribe(观察者);复制代码并打印结果:05-2016:16:50.65422935-22935/com.example.louder.rxjavademoD/chan:=============onNextHelloObserver==============onComplete复制代码1.2just()方法预览:publicstaticObservablejust(Titem)......publicstaticObservablejust(Titem1,Titem2,Titem3,Titem4,Titem5,Titem6,Titem7,Titem8,Titem9,Titem10)是为了什么?创建一个可观察对象并发送事件。发送的事件数不能超过10个。如何使用?Observable.just(1,2,3).subscribe(newObserver(){@OverridepublicvoidonSubscribe(Disposabled){Log.d(TAG,"=================onSubscribe");}@OverridepublicvoidonNext(Integerinteger){Log.d(TAG,"=================onNext"+integer);}@OverridepublicvoidonError(Throwablee){Log.d(TAG,"=================onError");}@OverridepublicvoidonComplete(){日志。d(TAG,"=================onComplete");}});复制代码上面的代码直接使用了链式调用,代码也很简单,这里就不详细说了,看打印结果:05-2016:27:26.93823281-23281/?D/chan:==================onSubscribe==================onNext1=================onNext2===================onNext3==================onComplete复制代码1.3Fromoperator1.3.1fromArray()方法预览:publicstaticObservablefromArray(T...items)作者:余刚说链接:有什么用?这个方法和just()类似,只不过fromArray可以传入10个以上的变量,并且可以传入一个数组。如何使用?Integerarray[]={1,2,3,4};Observable.fromArray(array).subscribe(newObserver(){@OverridepublicvoidonSubscribe(Disposabled){Log.d(TAG,"==================onSubscribe");}@OverridepublicvoidonNext(Integerinteger){Log.d(TAG,"=================onNext"+integer");}@OverridepublicvoidonError(Throwablee){Log.d(TAG,"=================onError");}@OverridepublicvoidonComplete(){Log.d(TAG,"==================onComplete");}});copycode代码和just()基本一样,直接看打印结果:05-2016:35:23.79723574-23574/com.example.louder.rxjavademoD/chan:=================================================================================================================================================================================================================================onNext1==================onNext2=================onNext3=================onNext4===================onComplete1.3.2fromCallable()方法预览:publicstaticObservablefromCallable(Callablesupplier)复制代码有为了什么?这里的Callable就是java.util.concurrent中的Callable。Callable和Runnable的用法基本相同,只是会返回一个结果值,发送给观察者。如何使用它?Observable.fromCallable(newCallable(){@OverridepublicIntegercall()throwsException{return1;}}).subscribe(newConsumer(){@Overridepublicvoidaccept(Integerinteger)throwsException{Log.d(TAG,"=================accept"+integer);}});05-2613:01:43.0096890-6890/?D/chan:==================accept1copycode1.3.3fromFuture()方法预览:publicstaticObservablefromFuture(Futurefuture)copy代码是做什么用的?参数中的Future就是java.util.concurrent中的Future。Future的作用是增加cancel()等方法来操作Callable。它可以通过get()方法获取Callable返回的值。