解析如下代码:Observable.create(new ObservableOnSubscribe() {@Override public void subscribe(ObservableEmitter e) throws Exception {e.onNext("hello");}}).map(new Function() {             @Override             public String apply(String s) throws Exception {                 return "helloworld";             }         }).subscribe(new Observer{public void onNext(String value){     System.out.println(value);}});

这段代码可以分解为

observableA  = Observable.create(new ObservableOnSubscribe() {@Override public void subscribe(ObservableEmitter e) throws Exception {e.onNext("hello");}}); observableB = observableA.map(new Function() {             @Override             public String apply(String s) throws Exception {                 return "helloworld";             }         });observableB.subscribe(new Observer{public void onNext(String value){     System.out.println(value);}})

 

observableA经过map后成为observableB,(ObservableA简称A,ObservableB简称B)

B注册了observerF.

B.subscribe(obseberF)

点击查看subscribe的源码,大体可以简化为:

b.subscribeActual();

subscribeActual方法是observalbe类的抽象方法。所有的observable都要重写这个方法。

B是A经过Map后变化而成的observable. 

所以B是oberservable的一个实现子类ObservableMap.

该类重写了subscribeActual方法;

source.subscribe(new MapObserver(t, function));

可以看到,MapObserver有两个构造参数。

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) 

一个是Observer,另一个是Fuction.

Observer就是observerF. Fuction可以理解为map变换中的参数。

source就是指的map操作符之前的oberservable。这里就是A。

即执行A.subscribe(new MapObserver(t,function)

同理,A在执行sunscribe时也会执行他的subscribeActual的方法。

举例说 A = Observeable.creat()而来的。所以A是是oberservable的一个实现子类ObservableCreate。

ObservableCreate的subscribeActual内容是

source.subscribe(new MapObserver(t,f));

source是create方法的参数ObservableOnSubscribe。

source.subscribe(parent)的意思就是执行下面方法体中的内容。

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("hello");

}

这里的参数e是经过MapObserver包装而成的一个Emitter类型,这里e.onNext时候会调用MapObsever 的onNext方法。

而MapObserver的onNext方法可以简化为

public void onnext()

{

U v;try {    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {    fail(ex);    return;}actual.onNext(v);

}

就是说先执行Map的变换方法。然后把这个方法的结果传递给observerF.

observerF在根据map的返回值调用next方法。

即下面这个方法。

subscribe(new Observer{

public void onNext(String value){
     System.out.println(value);
}

};

可以看到代码上我们是从上而下写的。

但在执行时subscribeActual方法是自下而上执行的。而OnNext方法或者apply是从上而下执行的。

了解这个原理后,我们看一下切换线程。

首先来看一下subscribeOn.

observable 在执行subscribeOn后返回了一个ObservableSubscribeOn类型。也是Observable的子类型之一。

看一下它的subscribeActual方法。

@Override    public void subscribeActual(final Observer<? super T> s) {        final SubscribeOnObserver parent = new SubscribeOnObserver(s);        s.onSubscribe(parent);        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {            @Override            public void run() {                source.subscribe(parent);            }        }));    }

意思是所有执行切换线程的那个observable在执行subscribe的时候就切换了线程。

从上一个结论我们知道subscribeActual方法是自下而上执行的。也就是说在切换线程之前的所有的observable的subscribe的线程一直在新线程上运行。

因为是subscribeActual方法调用了onNext方法。所以从上而下执行的onNext方法也是在新线程上运行。

onNext方法一路向下调用,一直到最终注册的Observer.

这就解释了为什么我们明明写了一个subscribeOn为什么所有的OnNext的方法的执行都在新线程上。也就是说subscribe不仅影响之后的onNext的执行线程,也影响之前的onNext线程.

如果一个obserable串写了两个subscribeOn那么逆流而上执行的时候回选用先切为倒数第一个的线程,然后切为倒数第二个,直至切换为第一个的线程。然后自上而下执行onNext的时候就以最后切换的线程为准,即第一次切换的线程。

这就解释了为什么多次执行subscribeOn来切换线程,只有第一个有效的原因。


接下来我们看一下ObserveOn的方法。

observable 在执行subscribeOn后返回了一个ObservableObserveOn类型。

@Override    protected void subscribeActual(Observer<? super T> observer) {        if (scheduler instanceof TrampolineScheduler) {            source.subscribe(observer);        } else {            Scheduler.Worker w = scheduler.createWorker();            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));        }    }

在subscribeActual中并没有直接切换线程。只是将这个线程 w 存入了新的observer中。

那么oberveOn切换线程是在哪里执行的呢。在ObserveOnObserver中,我们看一下他的onNext方法。

        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != QueueDisposable.ASYNC) {                queue.offer(t);            }            schedule();        }        void schedule() {            if (getAndIncrement() == 0) {                worker.schedule(this);            }        }

可以看到在onNext的时候,开始部署了新线程。我们看一下新线程开始后会怎样。

   @Override        public void run() {            if (outputFused) {                drainFused();            } else {                drainNormal();            }        }void drainNormal() {            int missed = 1;            final SimpleQueue q = queue;            final Observer<? super T> a = actual;            for (;;) {                if (checkTerminated(done, q.isEmpty(), a)) {                    return;                }                for (;;) {                    boolean d = done;                    T v;                    try {                        v = q.poll();                    } catch (Throwable ex) {                        Exceptions.throwIfFatal(ex);                        s.dispose();                        q.clear();                        a.onError(ex);                        return;                    }                    boolean empty = v == null;                    if (checkTerminated(d, empty, a)) {                        return;                    }                    if (empty) {                        break;                    }                    a.onNext(v);                }                missed = addAndGet(-missed);                if (missed == 0) {                    break;                }            }        }

代码中的a.onNext(v)是在run中进行的。就是说这里已经运行在了新线程中。a指的是observer就是自下而上逆行时下游的oberserver.所以observerOn只能影响在其后面的线程。

如果多个observeOn同时使用时,由于obserbeOn影响的是onNext(),onNext自上而下运行。遇到新的observerOn就会切换到新的线程。

所以observeOn影响的是下游的onNext,遇到新的observeOn之后就会切换为新的线程,只对其后面的有效。

更多相关文章

  1. 浅谈Java中Collections.sort对List排序的两种方法
  2. Python list sort方法的具体使用
  3. python list.sort()根据多个关键字排序的方法实现
  4. android上一些方法的区别和用法的注意事项
  5. android实现字体闪烁动画的方法
  6. Android中dispatchDraw分析
  7. 浅析Android中的消息机制-解决:Only the original thread that cr
  8. Android四大基本组件介绍与生命周期
  9. Android(安卓)MediaPlayer 常用方法介绍

随机推荐

  1. android TextView字体颜色根据焦点点击变
  2. Android里的动画(补间动画,帧动画,属性动画)
  3. Android开发指南-用户界面-绘制视图
  4. ListView的分割线相关属性
  5. [整] Android ListView 去除边缘阴影、选
  6. Android 如何使用GPU硬件加速
  7. Android ImageView 总结
  8. android中的UI控制(一)
  9. 33、Android Support兼容包详解
  10. android更换控件默认样式