上两篇文章都是对比分析RxJava中较基本的订阅流程与操作,即Observable、Flowable等基本元素的源码,还有map、lift操作符的源码。在对Rxjava框架有了一个坚实的基础后,此篇文章将直袭Rxjava中最闪亮的Point,也是Android猿平常在开发中经常遇到的需求 —— 线程切换,主线程中需要进行耗时操作,要求切换到子线程,子线程中需要进行UI更新,要切切换到主线程。以上切换原理如何?此篇将以揭晓。

(此系列文章重点在于分析源码,并不适合作为框架的初学篇,笔者建议熟悉其用法后再观此博客讨论学习。另此系列文章源码分析难度由浅至深,建议先学习理解之前的文章:
浅析RxJava 1.x&2.x版本使用区别及原理(一):Observable、Flowable等基本元素源码解析
浅析RxJava 1.x&2.x版本区别及原理(二):map、lift操作符源码解析

文章将首先分析线程调度者Scheduler的真正面目,了解线程切换的原理所在。再从subscribeOnobserveOn方法入手,知晓线程调度之间方法逻辑,结合此两点便可了解Rxjava框架的线程切换秘密。本篇涉及到的知识点有:

  • 线程变换的基本概念及1.x 版本线程变换使用
  • Scheduler源码及原理分析(主线程、子线程变换)
  • subscribeOn、observeOn方法源码分析

一. RxJava 1.x Scheduler线程变换

首先介绍一下线程变换的基本概念:

  • 让代码可以在不同的线程执行
  • subscribeOn ——订阅时的线程
  • observeOn —— 接收时的线程
  • Scheduler —— 实际做线程变换

1. Scheduler线程变换使用

Observable.create(new Observable.OnSubscribe() {                            @Override                            public void call(Subscriber<? super String> subscriber) {                                if (!subscriber.isUnsubscribed()) {                                    System.out.println("currentThread:" + Thread.currentThread());                                    subscriber.onNext("test");                                    subscriber.onCompleted();                                }                            }                        }).                        subscribeOn(Schedulers.newThread()).                        observeOn(AndroidSchedulers.mainThread()).                        subscribe(new Observer() {                            @Override                            public void onCompleted() {                            }                            @Override                            public void onError(Throwable e) {                            }                            @Override                            public void onNext(String s) {                                System.out.println("onNext:" + s + "currentThread:" + Thread.currentThread());                            }                        });

简单说明代码使用,除了一个常用的Observable创建create、订阅subscribe方法调用及实现外,此处在创建完Observable对象后运用了线程变换方法,即subscribeOn(Schedulers.newThread()),指定订阅时的线程为子线程(可做一些耗时操作),observeOn(AndroidSchedulers.mainThread())指定接收时的线程为主线程(可做一些UI变化)。

有了以上基本了解后,这里先提前列出RxJava 1.x版本中线程变换原理的重要元素:

  • Scheduler调度者
  • Operator操作符接口
  • lift核心操作符

以上重点就在于Scheduler抽象类,它才是线程切换的调度者,涉及到的重要元素有:

  • Scheduler抽象类
  • Worker —– 真正做线程调度的类
  • Action0 —— 在线程中执行的操作
  • schedule —— 实际做线程调度的方法


2. Scheduler源码分析

  • Scheduler调度者

找到Scheduler抽象类,查看其源码组成,如下:

可知Scheduler确实是一个抽象类,内有一个抽象方法createWorker(),方法参数是Worker。主要是还有一个抽象内部类Worker,实现了Subscription接口,内部有不同参数重载的两个schedule方法,方法中第一个参数的类型都是Action0。对比以上列举的重要元素,全部集齐!

同时也可以看出,虽然Scheduler抽象类是线程切换的调度者,但其内部只有一个抽象方法,因此幕后BOSS还是内部抽象类Worker,其内部的schedule方法作用就是进行线程调度,参数Action0就是即将被调度的线程操作。


  • 原理归纳

下面回到我们调用的线程方法源码上来,在分析之前还是抛出一些原理步骤归纳:

  1. 传入不同Scheduler来调度不同的线程
  2. 用Scheduler创建Worker来使用真正的线程池
  3. 传入具体操作Action0
  4. 通过schedule方法来实现调度

首先第一个步骤我们在调用subscribeOn(Schedulers.newThread())observeOn(AndroidSchedulers.mainThread())线程切换的方法时已经使用到了,接着深入Schedulers源码,查看其第二个步骤具体实现。

注意:这里不要将Scheduler和Schedulers弄混淆,前者是线程调度者,而后者Schedulers是一个静态类,为了提供创建Scheduler对象方法的工厂模式。


  • Schedulers

首先映入眼帘的就是Schedulers类三个Scheduler类型成员变量,从其变量名可知分别运用于不同的场景:计算调度、io读写调度、新线程调度,结合之前对Schedulers介绍,它内部采用了工厂模式来创建不同类型的Scheduler,供开发者使用。

我们使用的是第三个newThreadScheduler,查看红框的创建源码可知,第一次调用是nt变量为null,由RxJavaSchedulersHook的createNewThreadScheduler()方法创建newThreadScheduler,继而查看其方法实现:

可见此方法内部调用了另外一个参数重载的createNewThreadScheduler方法,创建了一个RxThreadFactory对象实例作为参数传入。细心的你会发现创建RxThreadFactory时传入了“RxNewThreadScheduler-”字符串,而在一开始展示的例子输出订阅时所在的线程Log中也包含了此字符串。

继续查看下面重载的方法,它创建了一个NewThreadScheduler实例,并将方法参数RxThreadFactory传入其构造方法中,最后返回。重点就是NewThreadScheduler类,查看其详细实现:

  • NewThreadScheduler

如上,之前在介绍Scheduler抽象类时,内部一个创建Worker的抽象方法createWorker,现在找到了其继承类——NewThreadScheduler,重点查看它实现的createWorker方法,创建了一个NewThreadWorker对象返回,将成员变量NewThreadWorker传给构造方法。再来查看NewThreadWorker源码:

  • NewThreadWorker

NewThreadWorker类中代码太多,这里先讲解重点部分。如上图,此类必然是继承于Worker抽象类,同时又实现了Subscription接口。注意其成员变量红框部分,既然Worker的继承类是真正实现线程调度的BOSS,其过程必定会有线程池的参与,其成员变量EXECUTORS正好印证,后续再讲。NewThreadWorker既然继承于抽象类Worker,必定实现了重要抽象方法schedule方法,查看其源码:

schedule方法内部最终调用的还是重载的多参数方法,查看多参数的schedule方法,发现其内部又调用了subscribeActual方法,并传入具体操作Action0参数,其方法名暗示也很明显了,来查看这个“真正调度”方法的实现:

subscribeActual方法的后两个参数指定延长时间,重点放到具体操作Action0参数。首先第一行代码RxJavaHooks.onScheduledAction(action)就是返回参数action,赋值给成员变量decoratedAction;下一步又使用ScheduledAction对Action0进行了一次包装,返回ScheduledAction类型变量run,后面直接将这个变量run传入executor线程池去运行了。

以上是这个重点方法的大致逻辑,再来详细查看细节部分,我们似乎还不太清楚ScheduledAction类型变量run的真面目,但说到executor线程池直接去运行run了,这结果已经不言而喻了。笔者抢答:线程池运行的肯定是线程啊,莫非ScheduledAction继承于Thread?好像离真相越来越近了,先来查看ScheduledAction的真面目:

果不其然,不过ScheduledAction是实现的Runnable接口,其run方法中调用了Action0接口的call方法。有了简单了解后再回到subscribeActual方法,可得知线程池运行这个Runnable,最终调用其run方法,但真正核心是调用Action0接口的call方法,它也是线程调度的原理。


  • 小结

至此,再回到源码分析中一开始列举的原理四步骤:

  1. 第一步已在我们调用线程调度方法时实现了,即传入不同Scheduler来调度不同的线程
  2. 深入Scheduler发现其内部只有一个createWorker方法和内部抽象类Worker,Scheduler调度者的幕后是创建创建Worker来使用真正的线程池,即第二步总结。
  3. Worker抽象类内部有schedule调度方法,其参数Action0就是具体的操作,即第三步总结。
  4. 最终Worker的子类实现了schedule调度方法,内部就是通过线程池运行Runnable,其run方法核心就是调用Action0接口的call方法,来实现线程调度!


3. Android中的Scheduler

我们在代码中通过subscribeOn(Schedulers.newThread())来指定订阅时的子线程,以上已介绍完Schedulers。又通过observeOn(AndroidSchedulers.mainThread())指定接收数据时的主线程,来查看AndroidSchedulers的详细实现。

还是先预告其原理核心:通过Handler和Looper来实现执行在主线程。

AndroidSchedulers不同于Schedulers内部提供了3个Scheduler调度者(计算、IO、新线程)选择,此处只有一个主线程mainThreadScheduler。我们是通过调用此类的mainThread()方法获取Scheduler返回,查看其详细实现,最终创建Scheduler的地方是其构造方法红框处!

真相大白,这里创建了一个LooperScheduler对象实例,并将主线程的Looper传入到构造方法中。笔者看到Looper真是觉得倍感亲切,之前一篇文章Android 消息机制:Handler、Looper、Message源码 详细版解析 ——从入门到升天分析过,Android的主线程就是通过Handler、Looper、Message来处理消息,因此Looper都出现了,Handler还会远吗~来查看LooperScheduler实现:

  • LooperScheduler

果不其然,LooperScheduler的内部唯一成员变量就是Handler,通过创建LooperScheduler时传入的主线程Looper,生成Handler。且LooperScheduler继承了Scheduler,查看重点实现创建Worker的createWorker() 方法,创建了一个继承于Worker的HandlerWorker类,并传入Handler至其构造方法。来查看HandlerWorker类实现:

  • HandlerWorker

(注意:Looper、Handler都已出现,期待已久的Message即将登场~)

首先HandlerWorker类构造方法为成员变量Handler赋值,再看最终重载的schedule方法,重点在于红框部分。笔者露出了会心一笑,这些Handler发送Message部分相信Android猿们应当再熟悉不过了。

  1. 首先创建包装了Action0和handler的ScheduledAction,之前已经介绍过它实现的Runnable接口,其run方法中调用了Action0接口的call方法。
  2. 接着封装了包含ScheduledAction的一个Message对象。
  3. 通过主线程的Handler发送该Message,发送到的线程就是MainLooper主线程。
  4. 过程结束,当MainLooper主线程收到该消息后,会执行其run方法,其中包含着线程调度的核心,也就是Action0的call方法最终被执行。


3. Schedulers和AndroidSchedulers小结

  • 方法:
    • Schedulers: 指定订阅时的子线程subscribeOn(Schedulers.newThread())中的Schedulers
    • AndroidSchedulers:指定接收数据时的主线程 observeOn(AndroidSchedulers.mainThread())的AndroidSchedulers
  • 线程切换机制:
    • Schedulers: Java机制中的线程池执行封装过Action0接口的Runnable进行线程调度,关键处是Action0接口的call方法。
    • AndroidSchedulers: Android中的消息机制,通过获取主线程的Looper,再创建与之对应的handler,将Action0接口封装到Message中,使用主线程的handler发送。当主线程的Looper收到消息后最终执行Action0接口的call方法,完成线程调度。




二. RxJava 1.x subscribeOn、observeOn原理分析

1 subscribeOn原理

前面部分讲解都是subscribeOn(Schedulers.newThread())参数部分,理解了大致的线程变换机制和Action0接口的重要性,在此回到一开始展示的基本使用代码,通过subscribeOn方法指定订阅时的线程,内部具体原理如何?

此节将深入分析subscribeOn方法源码,在此之前还是先列举出其原理步骤:

  1. 通过OnSubscribe接口来处理
  2. 利用Scheduler将发出动作Action0放到线程中执行

如上subscribeOn源码,最终返回了一个新的Observable对象,而方法内部调用了Observable的create方法来创建实例,此方法需要传入OnSubscribe接口参数,而源码中是创建了一个OperatorSubscribeOn实例,并传入旧的Observable实例和Scheduler实例。相必OperatorSubscribeOn实现了OnSubscribe接口,查看其实现:

果然如此!当我们再调用subscribeOn(Schedulers.newThread())取指定订阅时的子线程,调用之后会重新返回一个新的Observable,并创建一个新的OnSubscribe接口。这也就意味着基本元素订阅原理的最后一步,即调用OnSubscribe接口的call方法并传入Subscriber时,此OnSubscribe接口是新的OnSubscribe接口,而并非是我们调用create方法创建Observable传入的OnSubscribe接口。

继续查看其重点方法call,调用scheduler.createWorker()创建了一个线程调度对象Worker,调用schedule方法进行线程调度,意味着schedule方法中的参数——实现的Action0接口中的call逻辑在Worker调度的线程中执行! 而一开始我们调用subscribeOn(Schedulers.newThread())传入的是newThread,因此Action0接口中的call逻辑在newThread子线程中执行!

继续查看其重点方法call的参数Subscriber类型,而在newThread子线程中执行的Action0接口中的call方法,对Subscriber重新进行了一次简单封装在实现对应的onNextonComplete方法内部调用的还是的旧Subscriber实例,最后一行则是调用Observable的subscribe订阅方法,传入新的Subscriber实例。这也就意味着我们调用create方法创建Observable传入的OnSubscribe接口中call方法内对Subscribe的逻辑操作,例如调用Subscribe的onNext方法,这些发起的操作被执行的线程是子线程。

以上方法印证了先前列举的原理步骤中的第二步:OperatorSubscribeOn类中的call方法内部利用Scheduler将发出动作Action0放到线程中执行

最后再总结一下OperatorSubscribeOn类中的call方法内部逻辑:

  • 获取调度者Worker
  • 调用Worker的调度方法schedule,由于之前传入的参数是newThread,因此Action0接口中的所有逻辑操作都是在子线程newThread中执行。
  • Action0接口中的call方法重新封装了Subscribe类并传入,这也就意味着调用Subscribe的onNext等方法这些发起操作时,执行的线程实在子线程中!这也是为何可以将发出的动作被执行于子线程的核心所在。



2. observeOn原理

此节将深入分析observeOn方法源码,在此之前还是先列举出其原理步骤:

  • 采用lift操作符(代理模式)
  • 通过Subscriber来处理
  • 利用Scheduler将发出动作Action0放到线程中执行

此重载方法最终调用的是如上,内部采用了lift操作符,真相已然明确一半,在上一篇文章中分析过:lift操作符创建了并返回了一个类似于代理的Observable,来接收原Observable发起的数据,然后在Operator中重新包装了一个新的Subscriber实例返回,此实例中预先对数据做一些处理后传递并调用原Subscriber的onNext等方法。

此处的observeOn亦同理,创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNextonCompeteonError方法。

来查看observeOn操作符的源码:

如上图红框所示,其逻辑与上一篇文章中lift操作符原理完全类似!只是此处多了一个逻辑处理,即对线程切换的处理。继续查看封装后的Subscriber实现,是如何处理线程切换,ObserveOnSubscriber源码如下:

    //代理观察者    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {        final Subscriber<? super T> child;        final Scheduler.Worker recursiveScheduler;        final NotificationLite on;        final Queue queue;        //接收上一级Observable发出的数据        @Override        public void onNext(final T t) {            if (isUnsubscribed() || finished) {                return;            }            if (!queue.offer(on.next(t))) {                onError(new MissingBackpressureException());                return;            }            schedule();        }        ......        //线程切换处理数据⭐️⭐️⭐️⭐️⭐️        protected void schedule() {            if (counter.getAndIncrement() == 0) {                recursiveScheduler.schedule(this);//调用Worker调度者的schedule方法切换线程!            }        }        //在新的线程中将数据发送给目标观察者        @Override        public void call() {            long missed = 1L;            long currentEmission = emitted;            final Queue q = this.queue;            final Subscriber<? super T> localChild = this.child;            final NotificationLite localOn = this.on;            for (;;) {                while (requestAmount != currentEmission) {                    ...                    localChild.onNext(localOn.getValue(v));                }            }        }    }}   

如上,代码中的注释已做简单分析,observeOn方法处理线程调度的主要逻辑与subscribeOn相同,都是依赖Worker类的schedule方法切换线程,根据不同实现类型的Woker(例如子线程NewThreadWorker、主线程HandlerWorker)切换到与之对应的线程,只不过封装的调用线程切换方法的对象不同(前者是subscribeOn接口,后者是observeOn类)。



3. subscribeOn与observeOn原理小结

  • 调度的线程:
    • subscribeOn方法:创建Observable时实现的call方法中调用Subscriber方法,这些发起的动作执行于调用subscribeOn方法时传入的指定线程(如子线程newThread、ioThread、computationThread)。
    • observeOn方法: 调用subscribe传入的observer接口中onNextonCompleteonError的逻辑处理,被执行于调用observeOn方法时传入的指定线程(如Android主线程)。
  • 处理线程切换而包装的实例:
    • subscribeOn方法: 是包装原OnSubscribe过后的OperatorSubscribeOn,在继承的call方法中调用Worker的scheduler方法来切换线程。
    • observeOn方法: 是包装原Subscribe过后的ObserveOnSubscriber,在实现的onNextonCompleteonError中调用Worker的scheduler方法来切换线程。
  • 调度者:
    • subscribeOn方法: Schedulers(例如NewThreadScheduler中的NewThreadWorker)
    • observeOn方法: LooperScheduler中的HandlerWorker




前两篇文章都是先讲解Rxjava 1.x版本的源码分析,再讲解2.x 版本这样对比着来的。介于此部分篇幅较长,将其拆分成两篇文章,下一篇分析2.x 版本线程变换机制。

若有错误,虚心指教~

更多相关文章

  1. Android(安卓)进阶之了解源码——Activity启动
  2. 查看Android(安卓)API文档的正确方式
  3. Android手势源码浅析-----手势绘制(GestureOverlayView)
  4. Android性能优化篇:Android中如何避免创建不必要的对象
  5. android高仿微信视频编辑页-视频多张图片提取
  6. 一起来开发Android的天气软件(三)——使用Volley实现网络通信
  7. Android滑动冲突之完美实现RecycleView+下拉刷新+上拉加载+粘性H
  8. Android分析View的scrollBy()和scrollTo()的参数正负问题原理分
  9. 【Android】android镜像翻转

随机推荐

  1. Android面试题收集
  2. Broadcast
  3. Android 样式开发(二)selector篇
  4. Android SQLite系列
  5. android Logger 一二三
  6. 認識Android的BinderProxy和Binder類別
  7. 3.6.3新版本AndroidStudio报Could not re
  8. Android功能模块化之生成验证码Bitmap
  9. android 设置listview item选中背景色
  10. android下的gdb调试