• 线程控制Scheduler二
        • 1Scheduler的API二
        • 2Scheduler的原理二
          • 1 subscribeOn
            • 这边是版本110的源码
            • 这边是版本116的源码 建议看这个
          • 2 observeOn
          • 3 延伸doOnSubscribe

1.线程控制Scheduler(二)

给 Android 开发者的 RxJava 详解

1)Scheduler的API(二)

前面讲到了,可以利用 subscribeOn()结合 observeOn()来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap()等变换方法后,n能不能多切换几次线程?

答案是:能。因为observeOn() 指定的是 Subscriber 的线程,而这个Subscriber并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()参数中的 Subscriber,而是observeOn()执行时的当前 Observable所对应的 Subscriber,即它的直接下级Subscriber 。换句话说,observeOn()指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()即可。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定    .subscribeOn(Schedulers.io())    .observeOn(Schedulers.newThread())    .map(mapOperator) // 新线程,由 observeOn() 指定    .observeOn(Schedulers.io())    .map(mapOperator2) // IO 线程,由 observeOn() 指定    .observeOn(AndroidSchedulers.mainThread)     .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn()的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn()subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava线程控制的原理说起吧。

2)Scheduler的原理(二)

下面这个是 1.1.0的 版本:

public final Observable<T> subscribeOn(Scheduler scheduler) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return nest().lift(new OperatorSubscribeOn<T>(scheduler));}public final Observable<T> observeOn(Scheduler scheduler) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return lift(new OperatorObserveOn<T>(scheduler));}

可以看出其实 subscribeOn()Observable()的内部实现,也是用的lift()

下面这个 是1.1.6的版本:

public final Observable<T> subscribeOn(Scheduler scheduler) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return create(new OperatorSubscribeOn<T>(this, scheduler));}public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));}

具体看图(不同颜色的箭头表示不同的线程):

subscribeOn()原理图:

observeOn()原理图:

从图中可以看出,subscribeOn()observeOn()都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在OnSubscribe中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn()的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级Subscriber 发送事件时,因此 observeOn()控制的是它后面的线程。

(1) subscribeOn()

版本为1.1.01.1.6的 内部源码实现不同

compile 'io.reactivex:rxjava:1.1.0'compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.6'compile 'io.reactivex:rxandroid:1.2.1'
❶ 这边是版本1.1.0的源码

首先判断当前observable是不是ScalarSynchronousObservable类型的,如果是则会直接执行scalarScheduleOn方法,如果不是,则会通过nest()方法创建一个ScalarSynchronousObservable并把当前的Observable包装进去,并且lift一个OperatorSubscribeOn进去。

public final Observable<T> subscribeOn(Scheduler scheduler) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return nest().lift(new OperatorSubscribeOn<T>(scheduler));}

subscribeOn 就是把 一个Observable转为 另一个 Observable, 那

一步步分析:

nest()做了什么呢?看下面源码可以知道,他创建了一个ScalarSynchronousObservable

public final Observable<Observable<T>> nest() {    return just(this);}public final static <T> Observable<T> just(final T value) {    return ScalarSynchronousObservable.create(value);}

OperatorSubscribeOn做了什么? 看他构造方法可以知道他,保存了一个scheduler对象 ,注意看他的call方法的 ,它运行时的线程,在inner这个Worker上,于是它的运行线程已经被改了

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {    private final Scheduler scheduler;    public OperatorSubscribeOn(Scheduler scheduler) {        this.scheduler = scheduler;    }    @Override    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {        final Worker inner = scheduler.createWorker();        subscriber.add(inner);        return new Subscriber<Observable<T>>(subscriber) {            @Override            public void onCompleted() {}            @Override            public void onError(Throwable e) {                subscriber.onError(e);            }            @Override            public void onNext(final Observable<T> o) {                // inner.schedule 就是在这边进行线程切换,                 inner.schedule(new Action0() {                    @Override                    public void call() {                        final Thread t = Thread.currentThread();                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {                            @Override                            public void onCompleted() {                                subscriber.onCompleted();                            }                            @Override                            public void onError(Throwable e) {                                subscriber.onError(e);                            }                            @Override                            public void onNext(T t) {                                subscriber.onNext(t);                            }                            @Override                            public void setProducer(final Producer producer) {                                subscriber.setProducer(new Producer() {                                    @Override                                    public void request(final long n) {                                        if (Thread.currentThread() == t) {                                            producer.request(n);                                        } else {                                            inner.schedule(new Action0() {                                                @Override                                                public void call() {                                                    producer.request(n);                                                }                                            });                                        }                                    }                                });                            }                        });                    }                });            }        };    }}

但是 OperatorSubscribeOn 的call 方法在什么时候调用呢?那我们就要来看下lift(), 就是在hook.onLift(operator).call(o)这边进行调用的,

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {        return new Observable<R>(new OnSubscribe<R>() {            @Override            public void call(Subscriber<? super R> o) {                try {                    // 就是在这边调用的哦!!!!切换了线程                    Subscriber<? super T> st = hook.onLift(operator).call(o);                    try {                        st.onStart();                        onSubscribe.call(st);                    } catch (Throwable e) {                        Exceptions.throwIfFatal(e);                        st.onError(e);                    }                } catch (Throwable e) {                    Exceptions.throwIfFatal(e);                    o.onError(e);                }            }        });    }
❷ 这边是版本1.1.6的源码 建议看这个
 public final Observable<T> subscribeOn(Scheduler scheduler) {        if (this instanceof ScalarSynchronousObservable) {            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);        }        return create(new OperatorSubscribeOn<T>(this, scheduler)); // 这边是重点,就是这边修改了    }

Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2OnSubscribe_2(= OperatorSubscribeOn)

那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的引用,即下面代码的source(上面代码的this)。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {    final Scheduler scheduler;    final Observable<T> source;    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {        this.scheduler = scheduler;        this.source = source;    }    @Override    public void call(final Subscriber<? super T> subscriber) {        final Worker inner = scheduler.createWorker();        subscriber.add(inner);        inner.schedule(new Action0() {            @Override            public void call() {                final Thread t = Thread.currentThread();                Subscriber<T> s = new Subscriber<T>(subscriber) {                    @Override                    public void onNext(T t) {                        subscriber.onNext(t);                    }                    @Override                    public void onError(Throwable e) {                        try {                            subscriber.onError(e);                        } finally {                            inner.unsubscribe();                        }                    }                    @Override                    public void onCompleted() {                        try {                            subscriber.onCompleted();                        } finally {                            inner.unsubscribe();                        }                    }                    @Override                    public void setProducer(final Producer p) {                        subscriber.setProducer(new Producer() {                            @Override                            public void request(final long n) {                                if (t == Thread.currentThread()) {                                    p.request(n);                                } else {                                    inner.schedule(new Action0() {                                        @Override                                        public void call() {                                            p.request(n);                                        }                                    });                                }                            }                        });                    }                };                source.unsafeSubscribe(s);            }        });    }}

我们看到它实现OnSubscribe,并保存了原来的ObservableScheduler。创建一个用于在不同线程执行的Worker对象,然后worker调用schedule()去执行Observable1subscribe()

再来看subscribeOn中调用的create方法

public static <T> Observable<T> create(OnSubscribe<T> f) {    return new Observable<T>(hook.onCreate(f));}//hook的onCreate方法public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {    return f;}

没做什么,就是把使用刚刚创建OperatorSubscribeOn作为参数,返回给方法链下一个。 那到这里我们就清晰了,其实调用了subscribeOn后,返回的Observable已经不是最开始创建那个。我们来总结一下,先把create创建的Observable称为Observable1,把它的OnSubscribe称为OnSubscribe1,把subscribeOn中创建的Observable称为Observable2,把它的OnSubscribe(OperatorSubscribeOn implements OnSubscribe)称为OnSubscribe2

 public final Subscription subscribe(Subscriber<? super T> subscriber) {        return Observable.subscribe(subscriber, this);    }

在方法链最后调用了subscribe(),应该是调用了Observable2subscribe(),那这时候就会调用到OnSubscribe2call方法,call中的参数就我们在方法链中传进来的。call中会使用worker切换线程去执行,到最后才是调用了OnSubscribe1unsafeSubscribe方法,并把重新封装的Subscriber传过去。而unsafeSubscribe会调用OnSubscribe1call(),所以其他流程是不变的,只是subscribe()执行的线程改了。

那为什么多次调用subscribeOn是无效的呢?因为多调用一次,只是多了一层先把OnSubscribe,但其实会对Observable1subscribe()产生影响的是OnSubscribe2,其他的只会对它前面的产生影响。就好比下面这样

new Thread() {        @Override        public void run() {            new Thread() {                @Override                public void run() {                    Log.i(TAG,"run");                }            }.start();        }    }.start();
(2) observeOn()

1.1.6版本源码

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));}

observeOn()这边用到的是lift()(这边的lift()1.1.0lift(),略有差别,但是本质call方法是一样的),在这边 创建了一个新的 Onsubscribe(=OnSubsribeLift)

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {    return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();    final OnSubscribe<T> parent;    final Operator<? extends R, ? super T> operator;    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {        this.parent = parent;        this.operator = operator;    }    @Override    public void call(Subscriber<? super R> o) {        try {            Subscriber<? super T> st = hook.onLift(operator).call(o);            try {                st.onStart();                parent.call(st);            } catch (Throwable e) {                Exceptions.throwIfFatal(e);                st.onError(e);            }        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            o.onError(e);        }    }}

可以看到在lift中调用了OperatorObserveOncall,传进去Subscriber创建了一个新的Subscriber,再将其传入onSubscribe.call()中,说明了这个新的Subscriber.onNext(),会被调用,而且它肯定在里面还调用了原来的Subscriber.onNext()方法。 到OperatorObserveOn中就会发现,这个是新的SubscriberOperatorObserveOn的内部类ObserveOnSubscriber

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {    this.child = child;    this.recursiveScheduler = scheduler.createWorker();    .......        }    void init() {          ......        Subscriber<? super T> localChild = child;        ......        localChild.add(recursiveScheduler);        localChild.add(this);    }    @Override    public void onStart() {        ......    }    @Override    public void onNext(final T t) {        ......        schedule();    }    @Override    public void onCompleted() {         ......       schedule();    }    @Override    public void onError(final Throwable e) {         ......       schedule();    }    protected void schedule() {        if (counter.getAndIncrement() == 0) {            recursiveScheduler.schedule(this);        }    }    // only execute this from schedule()    @Override    public void call() {        ......        for (;;) {            if (checkTerminated(finished, q.isEmpty(), localChild, q)) {                return;            }            ......                                    localChild.onNext(localOn.getValue(v));            ......                    }    }    boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {        if (a.isUnsubscribed()) {        ......           if (e != null) {                a.onError(e);            } else {                a.onCompleted();            }        ......                    return false;    }}

我们看到onNextonCompletedonError中都调用了scheduleschedule中又调用了recursiveScheduler.schedule(this)recursiveScheduler就是根据我们设置的线程模式生成的Worker,在这里线程就切换到我们指定的那边了。然后又调用到this.call(),在call里面又调用了localChild.onNextonErroronCompleted,这个localChild就是原来的Subscriber,所以onNextonErroronCompleted就会在我们指定的线程调用了。

那为什么会对observeOn后面的mapflatMap起作用,因为map里面其实也是调用了lift去创建一个新的Subscriber,所以也就会在新的线程中执行。

为什么每次observeOn都会起作用,因为它跟下面这个差不多。

new Thread() {        @Override        public void run() {                 map();            new Thread() {                @Override                public void run() {                    map();                   onNext();                }            }.start();        }    }.start();
(3) 延伸:doOnSubscribe()

然而,虽然超过一个的 subscribeOn()对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲Subscriber 的时候,提到过 SubscriberonStart()可以用作流程开始前的初始化。然而 onStart()由于在subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()将会在什么线程执行。

而与 Subscriber.onStart()相对应的,有一个方法Observable.doOnSubscribe()。它和 Subscriber.onStart()同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在doOnSubscribe()之后有 subscribeOn()的话,它将执行在离它最近的 subscribeOn()所指定的线程。

示例代码:

Observable.create(onSubscribe)    .subscribeOn(Schedulers.io())    .doOnSubscribe(new Action0() {        @Override        public void call() {            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行        }    })    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程    .observeOn(AndroidSchedulers.mainThread())    .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn(),就能指定准备工作的线程了。

参考
给 Android 开发者的 RxJava 详解
RxJava中的线程切换源码分析

更多相关文章

  1. SpringBoot 2.0 中 HikariCP 数据库连接池原理解析
  2. Services的生命周期
  3. android ndk的使用
  4. 面试总结(6):ScheduledExecutorService的使用
  5. Android——PullToRefresh自动刷新
  6. ANDROID STUDIO&&Eclipse Android项目缺少R文件解决方法(完解)
  7. 浅谈Android(安卓)java层ServiceManager
  8. android中WebView的Java与JavaScript交互
  9. Android中插件化的简单实现:启动未注册的Activity

随机推荐

  1. Android(安卓)Toast工具类大放送
  2. Android Handler消息机制从原理到应用详
  3. 什么是android market?国内三大类android
  4. adb通过TCP/IP来调试Android设备
  5. 分享:Android浏览器,用NDK C++做底层开发
  6. Facebook Home开启Android后应用时代
  7. android多语言切换失效
  8. Android即时消息介绍
  9. Android简介及发展历程
  10. 在eclipse中查看android SDK的源代码