RxJava 学习笔记(四)
-
-
- 线程控制Scheduler二
- 1Scheduler的API二
- 2Scheduler的原理二
- 1 subscribeOn
- 这边是版本110的源码
- 这边是版本116的源码 建议看这个
- 2 observeOn
- 3 延伸doOnSubscribe
- 1 subscribeOn
- 线程控制Scheduler二
-
1.线程控制Scheduler(二)
给 Android 开发者的 RxJava 详解
1)Scheduler的API(二)
前面讲到了,可以利用 subscribeOn()
结合 observeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map()
flatMap()
等变换方法后,n能不能多切换几次线程?
答案是:能。因为observeOn()
指定的是 Subscriber 的线程,而这个Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()
参数中的 Subscriber
,而是observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()
即可。上代码:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
如上,通过 observeOn()
的多次调用,程序实现了线程的多次切换。
不过,不同于 observeOn()
,subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn()
呢?会有什么效果?
这个问题先放着,我们还是从 RxJava
线程控制的原理说起吧。
2)Scheduler的原理(二)
下面这个是 1.1.0
的 版本:
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return nest().lift(new OperatorSubscribeOn<T>(scheduler));}public final Observable<T> observeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler));}
可以看出其实 subscribeOn()
和 Observable()
的内部实现,也是用的lift()
。
下面这个 是1.1.6
的版本:
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler));}public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));}
具体看图(不同颜色的箭头表示不同的线程
):
subscribeOn()
原理图:
observeOn()
原理图:
从图中可以看出,subscribeOn()
和observeOn()
都做了线程切换的工作(图中的 "schedule..."
部位)。不同的是, subscribeOn()
的线程切换发生在OnSubscribe
中,即在它通知上一级 OnSubscribe
时,这时事件还没有开始发送,因此 subscribeOn()
的线程控制可以从事件发出的开端就造成影响;而 observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级Subscriber
发送事件时,因此 observeOn()
控制的是它后面的线程。
(1) subscribeOn()
版本为1.1.0
和1.1.6
的 内部源码实现不同
compile 'io.reactivex:rxjava:1.1.0'compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.6'compile 'io.reactivex:rxandroid:1.2.1'
❶ 这边是版本1.1.0的源码
首先判断当前observable
是不是ScalarSynchronousObservable
类型的,如果是则会直接执行scalarScheduleOn
方法,如果不是,则会通过nest()
方法创建一个ScalarSynchronousObservable
并把当前的Observable
包装进去,并且lift
一个OperatorSubscribeOn
进去。
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return nest().lift(new OperatorSubscribeOn<T>(scheduler));}
subscribeOn 就是把 一个Observable
转为 另一个 Observable
, 那
一步步分析:
①
nest()
做了什么呢?看下面源码可以知道,他创建了一个ScalarSynchronousObservable
public final Observable<Observable<T>> nest() { return just(this);}public final static <T> Observable<T> just(final T value) { return ScalarSynchronousObservable.create(value);}
②
OperatorSubscribeOn
做了什么? 看他构造方法可以知道他,保存了一个scheduler
对象 ,注意看他的call
方法的 ,它运行时的线程,在inner
这个Worker
上,于是它的运行线程已经被改了
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> { private final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { @Override public void onCompleted() {} @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(final Observable<T> o) { // inner.schedule 就是在这边进行线程切换, inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (Thread.currentThread() == t) { producer.request(n); } else { inner.schedule(new Action0() { @Override public void call() { producer.request(n); } }); } } }); } }); } }); } }; }}
③
但是 OperatorSubscribeOn
的call 方法在什么时候调用呢?那我们就要来看下lift()
, 就是在hook.onLift(operator).call(o)
这边进行调用的,
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { // 就是在这边调用的哦!!!!切换了线程 Subscriber<? super T> st = hook.onLift(operator).call(o); try { st.onStart(); onSubscribe.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } }); }
❷ 这边是版本1.1.6的源码 建议看这个
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); // 这边是重点,就是这边修改了 }
把
Observable.create()
创建的称之为Observable_1
,OnSubscribe_1
。把subscribeOn()
创建的称之为Observable_2
,OnSubscribe_2(= OperatorSubscribeOn)
。那么,前两步就是创建了两个的
observable
,和OnSubscribe
,并且OnSubscribe_2
中保存了Observable_1
的引用,即下面代码的source
(上面代码的this
)。
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); }}
我们看到它实现OnSubscribe
,并保存了原来的Observable
和Scheduler
。创建一个用于在不同线程执行的Worker
对象,然后worker
调用schedule()
去执行Observable1
的subscribe()
。
再来看subscribeOn
中调用的create
方法
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f));}//hook的onCreate方法public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { return f;}
没做什么,就是把使用刚刚创建OperatorSubscribeOn
作为参数,返回给方法链下一个。 那到这里我们就清晰了,其实调用了subscribeOn
后,返回的Observable
已经不是最开始创建那个。我们来总结一下,先把create
创建的Observable
称为Observable1
,把它的OnSubscribe
称为OnSubscribe1
,把subscribeOn
中创建的Observable
称为Observable2
,把它的OnSubscribe(OperatorSubscribeOn implements OnSubscribe)
称为OnSubscribe2
。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
在方法链最后调用了subscribe()
,应该是调用了Observable2
的subscribe()
,那这时候就会调用到OnSubscribe2
的call
方法,call
中的参数就我们在方法链中传进来的。call
中会使用worker
切换线程去执行,到最后才是调用了OnSubscribe1
的unsafeSubscribe方
法,并把重新封装的Subscriber
传过去。而unsafeSubscribe
会调用OnSubscribe1
的call()
,所以其他流程是不变的,只是subscribe()
执行的线程改了。
那为什么多次调用subscribeOn
是无效的呢?因为多调用一次,只是多了一层先把OnSubscribe
,但其实会对Observable1
的subscribe()
产生影响的是OnSubscribe2
,其他的只会对它前面的产生影响。就好比下面这样
new Thread() { @Override public void run() { new Thread() { @Override public void run() { Log.i(TAG,"run"); } }.start(); } }.start();
(2) observeOn()
1.1.6
版本源码
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));}
observeOn()
这边用到的是lift()
(这边的lift()
和1.1.0
的 lift()
,略有差别,但是本质call
方法是一样的),在这边 创建了一个新的 Onsubscribe(=OnSubsribeLift)
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } }}
可以看到在lift
中调用了OperatorObserveOn
的call
,传进去Subscriber
创建了一个新的Subscriber
,再将其传入onSubscribe.call()
中,说明了这个新的Subscriber.onNext()
,会被调用,而且它肯定在里面还调用了原来的Subscriber.onNext()
方法。 到OperatorObserveOn
中就会发现,这个是新的Subscriber
是OperatorObserveOn
的内部类ObserveOnSubscriber
。
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); ....... } void init() { ...... Subscriber<? super T> localChild = child; ...... localChild.add(recursiveScheduler); localChild.add(this); } @Override public void onStart() { ...... } @Override public void onNext(final T t) { ...... schedule(); } @Override public void onCompleted() { ...... schedule(); } @Override public void onError(final Throwable e) { ...... schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } // only execute this from schedule() @Override public void call() { ...... for (;;) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } ...... localChild.onNext(localOn.getValue(v)); ...... } } boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) { if (a.isUnsubscribed()) { ...... if (e != null) { a.onError(e); } else { a.onCompleted(); } ...... return false; }}
我们看到onNext
、onCompleted
、onError
中都调用了schedule
,schedule
中又调用了recursiveScheduler.schedule(this)
,recursiveScheduler
就是根据我们设置的线程模式生成的Worker
,在这里线程就切换到我们指定的那边了。然后又调用到this.call()
,在call
里面又调用了localChild.onNext
、onError
、onCompleted
,这个localChild
就是原来的Subscriber
,所以onNext
、onError
、onCompleted
就会在我们指定的线程调用了。
那为什么会对observeOn
后面的map
、flatMap
起作用,因为map
里面其实也是调用了lift去创建一个新的Subscriber
,所以也就会在新的线程中执行。
为什么每次observeOn
都会起作用,因为它跟下面这个差不多。
new Thread() { @Override public void run() { map(); new Thread() { @Override public void run() { map(); onNext(); } }.start(); } }.start();
(3) 延伸:doOnSubscribe()
然而,虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲Subscriber
的时候,提到过 Subscriber
的 onStart()
可以用作流程开始前的初始化。然而 onStart()
由于在subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()
被调用时的线程。这就导致如果onStart()
中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar
,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()
将会在什么线程执行。
而与 Subscriber.onStart()
相对应的,有一个方法Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同样是在subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe()
发生的线程;而如果在doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
示例代码:
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定准备工作的线程了。
参考
给 Android 开发者的 RxJava 详解
RxJava中的线程切换源码分析
更多相关文章
- SpringBoot 2.0 中 HikariCP 数据库连接池原理解析
- Services的生命周期
- android ndk的使用
- 面试总结(6):ScheduledExecutorService的使用
- Android——PullToRefresh自动刷新
- ANDROID STUDIO&&Eclipse Android项目缺少R文件解决方法(完解)
- 浅谈Android(安卓)java层ServiceManager
- android中WebView的Java与JavaScript交互
- Android中插件化的简单实现:启动未注册的Activity