RxJava2源码分析
本文基于RxJava2.2.1版本分析。
简介
官方介绍:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是一个处理异步操作的库、基于事件流的库。
其核心的东西不外乎两个, Observable(被观察者)和 Observer(观察者)。当观察者订阅了被观察者之后,被观察者可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后将结果交给观察者处理。
create
create()是创建Observable对象的方法之一,还有其他诸多的创建方法,这里主要了解下Observable的创建流程:
// 创建被观察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onComplete(); } });
进入create():
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // requireNonNull()很多地方会用到,顾名思义就是用来判空的 ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
进入onAssembly():
public static Observable onAssembly(@NonNull Observable source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { // 这里hook return apply(f, source); } return source; }
onAssembly()主要是用来hook的,默认使用中都没有hook,返回的就是入参ObservableCreate。我们来看一下ObservableCreate:
final ObservableOnSubscribe source; public ObservableCreate(ObservableOnSubscribe source) { this.source = source; }
这里我们看到ObservableCreate将最初我们创建的ObservableOnSubscribe包装了一层,用成员变量source记住。
至此,创建流程结束,我们得到了Observable< T >对象,其实就是ObservableCreate< T >.
subscribe
承接上文的示例代码:
// 创建观察者 Observer observer = new Observer() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } }; // 观察者和被观察者建立连接 observable.subscribe(observer);
进入subscribe():
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
这里的关键代码只有一句subscribeActual(observer);
,被观察者被订阅时真正被执行的方法:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter parent = new CreateEmitter(observer); // 这里首先调用了observer的onSubscribe() observer.onSubscribe(parent); try { // 然后调用了source的subscribe() // 这里的sourec就是之前保存的ObservableCreate // subscribe()就是我们自己一开始实现的方法 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
CreateEmitter是啥,它是一个事件的发射器。当我们在ObservableCreate的subscribe()中调用CreateEmitter的onNext()、onError()或者onComplete()方法时,它会去调用observer的onNext()、onError()或者onComplete()方法。
从CreateEmitter
一句中,大概可以猜到,CreateEmitter持有了observer,确实如此:
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }
我们可以看到CreateEmitter的onNext()等方法被调用时,其中会调用对应的observer的onNext()等方法。
以上是最简单的流程,那在其中加入一个操作符会怎么样呢?我们来看一下map是做什么的。
map
map起到类型转换的作用,例子如下,源头observable发送的是String类型的数字,利用map转换成int型,最终在终点observer接受到的也是int类型数据:
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("1"); e.onComplete(); } }).map(new Function() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe() called with: d = [" + d + "]"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext() called with: value = [" + value + "]"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError() called with: e = [" + e + "]"); } @Override public void onComplete() { Log.d(TAG, "onComplete() called"); } });
来看它的源码:
public final Observable map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper)); }
抛开onAssembly(),好像只是创建了一个ObservableMap并且返回:
public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) { // 将传入的observable记住 super(source); // 将变换的方法记住 this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver(t, function)); }
这里和之前看到的ObservableCreate如出一辙,将上一个observable包装了起来。
然后当subscribeActual()执行的时候,去调用source的subscribe(),这里的source指的是上游的observable。并且创建了一个新的observer,MapObserver将下游的observer包装了一层。
如果多次map()的话,我们可以将函数调用的流程分为三步:
- 首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
- 当
observable.subscribe(observer);
之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。 - 当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。
假设当前的observer是MapObserver,在它的onNext()被调用时会调用它的变换方法,然后继续调用下游observer的onNext(),如下:
public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { // 调用变换方法mapper.apply(t) v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } // 调用下游observer的onNext() downstream.onNext(v); }
最终调用最后一个observer的onNext()。
subscribeOn
用来指定subscribe()发生的线程的:
public final Observable subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler)); }
这里返回了一个ObservableSubscribeOn:
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver parent = new SubscribeOnObserver(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
这里做法其实和以上的提到过的ObservableCreate和ObservableMap中的是一样的,包装了上游的observable,并且额外记录了subscribe()的线程调度器。
当subscribeActual()被执行的时候,会切换到线程调度器所指定的线程。
那么如果subscribeOn(Schedulers.xxx())切换线程N次,总是以第一次为准,或者说离源observable最近的那次为准,并且对其上面的代码生效。
observeOn
指定onNext()等方法执行的线程:
public final Observable observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize)); }
还是似曾相识的场景,创建了一个ObservableObserveOn:
public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } }
自然少不了对原来observable的包装,同时记录了线程调度器。
当从上游push消息过来的时候,我们会看到ObservableObserveOn会切换到所记录的线程调度器指定的线程:
public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
以上是RxJava2最基本方法的一些源码分析。
链式调用原理
我们可以将函数调用的流程分为三步:
- 首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
- 当
observable.subscribe(observer);
之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。 - 当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。
实战用例
以下是我写的demo的部分代码,其中模拟了两个RxJava的实战场景:
- 注册登录。
- 每次进入页面,按顺序请求接口,展示弹窗(避免多个弹窗重叠)。
public static final int REGISTER_SUCCESS_LOGIN_SUCCESS = 0; // 注册成功, 登录成功 public static final int REGISTER_SUCCESS_LOGIN_FAIL = 1; // 注册成功, 登录失败 public static final int REGISTER_FAIL = 2; // 注册失败 @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rx_java_test); } public void onLoginSuccessClick(View view) { gotoRegister(REGISTER_SUCCESS_LOGIN_SUCCESS); } public void onLoginFailClick(View view) { gotoRegister(REGISTER_SUCCESS_LOGIN_FAIL); } public void onRegisterFailClick(View view) { gotoRegister(REGISTER_FAIL); } /** * 按序加载信息 * * @param view */ public void onLoadInfoInSequenceClick(View view) { int index = 0; loadSingleInfo(index).observeOn(Schedulers.io()) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return loadSingleInfo(integer); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { ToastUtil.showToast("new info are all loaded"); } }); } /** * 发起注册请求 * * @param type */ private void gotoRegister(final int type) { register(type).subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { return login(type); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { ToastUtil.showToast("sending request..."); } @Override public void onNext(Integer integer) { ToastUtil.showToast("login successfully"); } @Override public void onError(Throwable e) { ToastUtil.showToast(e.getMessage()); } @Override public void onComplete() { } }); } /** * 注册 * * @param type * @return */ private Observable register(final int type) { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { // 模拟网络请求 SystemClock.sleep(1000); if (registerSuccess(type)) { // 注册成功 emitter.onNext(type); } else { // 注册失败 emitter.onError(new Exception("fail to register!")); } } }); } /** * 登录 * * @param type * @return */ private Observable login(final int type) { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { // 模拟网络请求 SystemClock.sleep(1000); if (loginSuccess(type)) { // 登录成功 emitter.onNext(type); } else { // 登录失败 emitter.onError(new Exception("fail to login!")); } } }); } /** * 是否注册成功 * * @param type * @return */ private boolean registerSuccess(int type) { return type != REGISTER_FAIL; } /** * 是否登录成功 * * @param type * @return */ private boolean loginSuccess(int type) { return type == REGISTER_SUCCESS_LOGIN_SUCCESS; } /** * 获取单个信息 * * @param index * @return */ private Observable loadSingleInfo(final int index) { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(final ObservableEmitter emitter) throws Exception { // 模拟网络请求 SystemClock.sleep(200); if (hasLoaded()) { emitter.onNext(index + 1); } else { runOnUiThread(new Runnable() { @Override public void run() { showAlertDialog(index, new DialogInterface.OnDismissListener() { @Override public void onDismiss(DialogInterface dialog) { emitter.onNext(index + 1); } }); } }); } } }); } /** * 是否展示过该信息 * * @return */ private boolean hasLoaded() { long currTime = SystemClock.currentThreadTimeMillis(); return currTime % 2 == 0; } /** * 展示某个信息对应弹窗 * * @param index * @param dismissListener */ private void showAlertDialog(int index, final DialogInterface.OnDismissListener dismissListener) { new AlertDialog.Builder(this) .setTitle("Dialog index " + index) .setOnDismissListener(new DialogInterface.OnDismissListener() { @Override public void onDismiss(DialogInterface dialog) { if (dismissListener != null) { dismissListener.onDismiss(dialog); } } }) .create() .show(); }
demo的源码地址。
参考:
1.rxjava源码分析
2.RxJava2 源码解析(一)
3.RxJava2 源码解析(二)
更多相关文章
- Intent的定义及用法
- android计时器 message+handler; timer+timertask
- 禁止viewpager左右滑动
- Android(安卓)Java 与 C++ 调用过程中的常量,路径名、文件名、后
- 安卓全局获取Context
- Android(安卓)Studio之导入外部so库
- Identifying Sensors and Sensor Capabilities
- android ViewGroup左右滑屏方法2
- android测量心率的实现方法