作为近两年好评如潮的Rx系列,最近研究了下Android用到的RxJava,看了扔物线的给 Android 开发者的 RxJava 详解后还是有点不知其所以然,就研究了下源码。首先看了比较常用的线程切换。把一个流程搞明白了,其他的都很类似。
Rx的介绍以及Api可以看Rx文档

本文是对常用的的io线程加载数据并在主线程更新界面的流程分析。
首先说一下Android 里导入RxJava的时候同时导入的RxAndroid框架,看源码其实发现里面只有两个主要的类,HandlerScheduler和AndroidSchedulers,这两个类保证了可以切换到Android主线程。后面会有提到。

首先看下线程切换的例子

      Observable.create(new Observable.OnSubscribe() {            @Override            public void call(Subscriber<? super String> subscriber) {                Log.i(TAG,Thread.currentThread().getName());            }        })                .subscribeOn(Schedulers.io())                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Subscriber() {                    @Override                    public void onCompleted() {                    }                    @Override                    public void onError(Throwable e) {                    }                    @Override                    public void onNext(String s) {                        Log.i(TAG,Thread.currentThread().getName());                    }                });

这段程序通过log可以看到执行的效果是call方法执行在io子线程中,onNext执行在主线程中。实际应用中就可以把网络请求等耗时操作放在call方法,然后再onNext里更新界面。

下面分析每一步都做了什么,为了便于理解,从最后的subscribe()开始往前分析,本文只放了主要的代码。

1.subscribe()

通过查看源码,把创建的subscriber传到了下面的方法。

private static  Subscription subscribe(Subscriber<? super T> subscriber, Observable observable) {    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);    return hook.onSubscribeReturn(subscriber);}

这里的call方法其实执行的就是前面创建的Observable里的OnSubscribe的call方法。并把subscriber
传进去。这条链子就开始工作了,如果前面就是Observable.create(),就可以在call方法里调用
subscriber.onNext。这样并没有什么意义。下面看observeOn()

2.observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()返回的是一个HandlerScheduler,

private static class MainThreadSchedulerHolder {    static final Scheduler MAIN_THREAD_SCHEDULER =                new HandlerScheduler(new Handler(Looper.getMainLooper()));}

可以看到HandlerScheduler包含了一个包含主线程Looper的Handler。后面就要使用这个Handler来更新UI

下面看observeOn相关方法

    public final Observable observeOn(Scheduler scheduler) {        return lift(new OperatorObserveOn(scheduler));    }    public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return new Observable(new OnSubscribe() {            @Override            public void call(Subscriber<? super R> o) {                    Subscriber<? super T> st = hook.onLift(operator).call(o);                    st.onStart();                    onSubscribe.call(st);            }        });    }

可以看出,lift重新封装了一个Observable,根据第一步说的,之后的subscribe()执行,就会调用这里的
Observable 里的OnSubscribe的call方法,并且把subscriber1传进去。
然后到了第一行 hook.onLift(operator).call(o)。
这里执行的是上面new OperatorObserveOn()创建的OperatorObserveOn对象的call方法

    public Subscriber<? super T> call(Subscriber<? super T> child) {            ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child);            parent.init();            return parent;    }

返回了一个ObserveOnSubscriber对象st ,内部包含有subscriber1,注意这里把前面创建的HandlerScheduler传了进去。跟踪这个st对象的onNext方法可以找到HandlerScheduler里

handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

到这里在主线程更新UI就可以实现了。总结一下就是,

这个st对象的onNext方法的逻辑实现了主线程里执行subscriber1的onNext()方法,方便叙述,这里起名字为主线程st

而onSubscribe.call(st)的onSubscribe是哪里的呢,先起个名字叫onSubscribe1,后面会看到

3.subscribeOn(Schedulers.io())方法

先看一下Schedulers.io(),返回了一个CachedThreadScheduler作为参数,和HandlerScheduler不同的是,它实现了通过线程池在子线程里执行的逻辑

而subscribeOn看到源码

    public final Observable subscribeOn(Scheduler scheduler) {        if (this instanceof ScalarSynchronousObservable) {            return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);        }        return nest().lift(new OperatorSubscribeOn(scheduler));    }    public final Observable> nest() {        return just(this);    }    public final static  Observable just(final T value) {        return ScalarSynchronousObservable.create(value);    }
    public static final  ScalarSynchronousObservable create(T t) {        return new ScalarSynchronousObservable(t);    }    private final T t;    protected ScalarSynchronousObservable(final T t) {        super(new OnSubscribe() {            @Override            public void call(Subscriber<? super T> s) {                s.onNext(t);                s.onCompleted();            }        });        this.t = t;    }

可以发现just传递的this也就是包含有需要执行在子线程的Observable,调用最后just方法创建了一个特殊的ScalarSynchronousObservable,其中t就是上面传递的Observable

又到了lift()方法

   public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return new Observable(new OnSubscribe() {            @Override            public void call(Subscriber<? super R> o) {                    Subscriber<? super T> st = hook.onLift(operator).call(o);                    st.onStart();                    onSubscribe.call(st);            }        });    }

这时需要和刚才第二步结合起来,第二步的时候的onSubscribe1就是这里的new OnSubscribe(),因此onSubscribe1的call方法就执行了上面的call方法,有种递归的感觉,但是这里call方法传的值是主线程st,在经过hook.onLift(operator).call(o),去除复杂逻辑通俗点说就是,生成了一个包含有处理异步逻辑OnNext方法的Subscriber, 并把主线程st通过call(o)传递进去,把在子线程操作的Observable从OnNext方法传递过去。这样就可以在子线程执行call方法,并用主线程st通过handler更新UI。

这里onSubscribe.call(st)的onSubscribe其实就是通过nest()方法生成的ScalarSynchronousObservable。他的call方法

    protected ScalarSynchronousObservable(final T t) {        super(new OnSubscribe() {            @Override            public void call(Subscriber<? super T> s) {                s.onNext(t);                s.onCompleted();            }        });        this.t = t;    }

上面的t也就是最开始create创建的Observable,通过OperatorSubscribeOn的call方法创建的Subscriber的OnNext传递Observable就是这里的t然后执行在子线程里。

到这里一整个流程就结束了,实现了子线程加载数据,主线程更新UI。
其中最重要的是搞清楚lift方法,在RxJava里很多地方都有使用,对着源码捋一捋就可以理解差不多。

还有涉及到的OperatorSubscribeOn,OperatorObserveOn,可以说是切换线程的初始化操作。
对于异步的处理是在HandlerScheduler和CachedThreadScheduler里。

更多相关文章

  1. Android UI 的更新及其线程模型
  2. 关于“Android SDK manager中不出现完整Android版本安装包列表”
  3. Android多线程编程和线程池
  4. Android中的线程状态之AsyncTask详解
  5. Android关于apk版本更新方法
  6. Android之gallery 常见2种使用方法和3D效果总结

随机推荐

  1. powershell 远程安装MSI文件
  2. vscode的使用配置以及markdown常用语法
  3. 2021年3月春招百度,阿里,美团等大厂全新PHP
  4. 在vscode中go编码发生的问题整理
  5. HTML的必考知识点你会吗?
  6. 如何用Github轻松拉取谷歌容器镜像
  7. 面试必备的Java面试题及毕设项目
  8. 华为联运游戏或应用审核驳回:应用检测到支
  9. 超详细的TCP、Sokcket和SuperSocket与TCP
  10. ui设计要学插画吗?