谜之RxJava (一) —— 最基本的观察者模式
16lz
2021-01-25
最近在Android
界,最火的framework大概就是RxJava
了。
扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJava的过程中受益匪浅。经过阅读这篇文章后,我们来看下RxJava
的源码,揭开它神秘的面纱。
这里准备分几篇文章写,为了能让自己有个喘口气的机会。
先来上个最最简单的,经典的Demo。
Demo
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); }}).subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("rx", s); }});
这段代码产生的最终结果就是在Log里会出现hello。
看下这段代码的具体流程吧。
这里有2个函数create
和subscribe
,我们看看create
里面干了啥。
OnSubscribe对象
public final static Observable create(OnSubscribe f) { return new Observable(hook.onCreate(f));}// constructorprotected Observable(OnSubscribe f) { this.onSubscribe = f;}
这里的hook
是一个默认实现,里面不做任何事,就是返回f
。我们看见create
只是给Observable
的onSubscribe
赋值了我们定义的OnSubscribe
。
Subscriber对象
来看下subscribe
这个函数做了什么事
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this);}private static Subscription subscribe(Subscriber<? super T> subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); }}
我们看到,这里我们的subscriber
被SafeSubscriber
包裹了一层。
if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber(subscriber);}
然后开始执行工作流
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);return hook.onSubscribeReturn(subscriber);
默认的hook
只是返回我们之前定义的onSubscribe
,这里调用的call
方法就是我们在外面定义的
new Observable.OnSubscribe() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); }})
我们调用传入的subscriber
对象的onNext
方法,这里的subscriber
是SafeSubscriber
在SafeScriber
中
public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); // handle errors if the onNext implementation fails, not just if the Observable fails onError(e); }}
actual
就是我们自己定义的subscriber
。 原来SafeSubscriber
只是为了帮我们处理好异常,以及防止工作流的重复。
这是RxJava
最最基本的工作流,让我们认识到他是怎么工作的。之后我们来讲讲其中的细节和其他神奇的内容。
【谜之RxJava (二) —— Magic Lift】
欢迎关注我Github 以及 weibo、@Gemini
更多相关文章
- 理解Android中的引用类型
- android中json数据的解析
- android 进程间通信原理
- Android(安卓)Hook 机制之简单实战
- 《第一行代码--Android》读书笔记之碎片
- Android(安卓)不可缺少的异步(Thread、Handler、AsyncTask)实例解
- Android(安卓)通过源码解析 Fragment 启动过程
- Android(安卓)ORM 框架之 greenDAO
- Android中Service和Activity相互通信示例代码