最近在Android界,最火的framework大概就是RxJava了。
扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJava的过程中受益匪浅。经过阅读这篇文章后,我们来看下RxJava的源码,揭开它神秘的面纱。

这里准备分几篇文章写,为了能让自己有个喘口气的机会。

先来上个最最简单的,经典的Demo。

Demo

Observable.create(new Observable.OnSubscribe() {    @Override    public void call(Subscriber<? super String> subscriber) {        subscriber.onNext("hello");    }}).subscribe(new Subscriber() {    @Override    public void onCompleted() {    }    @Override    public void onError(Throwable e) {    }    @Override    public void onNext(String s) {        Log.d("rx", s);    }});

这段代码产生的最终结果就是在Log里会出现hello。

看下这段代码的具体流程吧。
这里有2个函数createsubscribe,我们看看create里面干了啥。

OnSubscribe对象

public final static  Observable create(OnSubscribe f) {    return new Observable(hook.onCreate(f));}// constructorprotected Observable(OnSubscribe f) {    this.onSubscribe = f;}

这里的hook是一个默认实现,里面不做任何事,就是返回f。我们看见create只是给ObservableonSubscribe赋值了我们定义的OnSubscribe

Subscriber对象

来看下subscribe这个函数做了什么事

public final Subscription subscribe(Subscriber<? super T> subscriber) {    return Observable.subscribe(subscriber, this);}private static  Subscription subscribe(Subscriber<? super T> subscriber, Observable observable) { // validate and proceed    if (subscriber == null) {        throw new IllegalArgumentException("observer can not be null");    }    if (observable.onSubscribe == null) {        throw new IllegalStateException("onSubscribe function can not be null.");        /*         * the subscribe function can also be overridden but generally that's not the appropriate approach         * so I won't mention that in the exception         */    }        // new Subscriber so onStart it    subscriber.onStart();        /*     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls     * to user code from within an Observer"     */    // if not already wrapped    if (!(subscriber instanceof SafeSubscriber)) {        // assign to `observer` so we return the protected version        subscriber = new SafeSubscriber(subscriber);    }    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.    try {        // allow the hook to intercept and/or decorate        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);        return hook.onSubscribeReturn(subscriber);    } catch (Throwable e) {        // special handling for certain Throwable/Error/Exception types        Exceptions.throwIfFatal(e);        // if an unhandled error occurs executing the onSubscribe we will propagate it        try {            subscriber.onError(hook.onSubscribeError(e));        } catch (OnErrorNotImplementedException e2) {            // special handling when onError is not implemented ... we just rethrow            throw e2;        } catch (Throwable e2) {            // if this happens it means the onError itself failed (perhaps an invalid function implementation)            // so we are unable to propagate the error correctly and will just throw            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);            // TODO could the hook be the cause of the error in the on error handling.            hook.onSubscribeError(r);            // TODO why aren't we throwing the hook's return value.            throw r;        }        return Subscriptions.unsubscribed();    }}

我们看到,这里我们的subscriberSafeSubscriber包裹了一层。

if (!(subscriber instanceof SafeSubscriber)) {    // assign to `observer` so we return the protected version    subscriber = new SafeSubscriber(subscriber);}

然后开始执行工作流

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);return hook.onSubscribeReturn(subscriber);

默认的hook只是返回我们之前定义的onSubscribe,这里调用的call方法就是我们在外面定义的

new Observable.OnSubscribe() {    @Override    public void call(Subscriber<? super String> subscriber) {        subscriber.onNext("hello");    }})

我们调用传入的subscriber对象的onNext方法,这里的subscriberSafeSubscriber
SafeScriber

public void onNext(T args) {    try {        if (!done) {            actual.onNext(args);        }    } catch (Throwable e) {        // we handle here instead of another method so we don't add stacks to the frame        // which can prevent it from being able to handle StackOverflow        Exceptions.throwIfFatal(e);        // handle errors if the onNext implementation fails, not just if the Observable fails        onError(e);    }}

actual就是我们自己定义的subscriber。 原来SafeSubscriber只是为了帮我们处理好异常,以及防止工作流的重复。

这是RxJava最最基本的工作流,让我们认识到他是怎么工作的。之后我们来讲讲其中的细节和其他神奇的内容。

【谜之RxJava (二) —— Magic Lift】

欢迎关注我Github 以及 weibo、@Gemini

更多相关文章

  1. 理解Android中的引用类型
  2. android中json数据的解析
  3. android 进程间通信原理
  4. Android(安卓)Hook 机制之简单实战
  5. 《第一行代码--Android》读书笔记之碎片
  6. Android(安卓)不可缺少的异步(Thread、Handler、AsyncTask)实例解
  7. Android(安卓)通过源码解析 Fragment 启动过程
  8. Android(安卓)ORM 框架之 greenDAO
  9. Android中Service和Activity相互通信示例代码

随机推荐

  1. *Android(安卓)多线程下载 仿下载助手(改
  2. Android之Service组件
  3. Android设置Dialog透明度、黑暗度方法
  4. android常用系统服务
  5. Android(安卓)中获取res资源
  6. Android实现Tab布局的4种方式(Fragment+Ta
  7. 转:Android(安卓)学习笔记 1
  8. android 判断手机为小米
  9. 第三部分 MediaPlayer的主要实现分析
  10. Android(安卓)Parcel和Parcelable类