从Android开发者的角度去理解RxJava(四)
解析如下代码:Observable.create(new ObservableOnSubscribe() {@Override public void subscribe(ObservableEmitter e) throws Exception {e.onNext("hello");}}).map(new Function() { @Override public String apply(String s) throws Exception { return "helloworld"; } }).subscribe(new Observer{public void onNext(String value){ System.out.println(value);}});
这段代码可以分解为
observableA = Observable.create(new ObservableOnSubscribe() {@Override public void subscribe(ObservableEmitter e) throws Exception {e.onNext("hello");}}); observableB = observableA.map(new Function() { @Override public String apply(String s) throws Exception { return "helloworld"; } });observableB.subscribe(new Observer{public void onNext(String value){ System.out.println(value);}})
observableA经过map后成为observableB,(ObservableA简称A,ObservableB简称B)
B注册了observerF.
B.subscribe(obseberF)
点击查看subscribe的源码,大体可以简化为:
b.subscribeActual();
subscribeActual方法是observalbe类的抽象方法。所有的observable都要重写这个方法。
B是A经过Map后变化而成的observable.
所以B是oberservable的一个实现子类ObservableMap.
该类重写了subscribeActual方法;
source.subscribe(new MapObserver(t, function));
可以看到,MapObserver有两个构造参数。
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper)
一个是Observer,另一个是Fuction.
Observer就是observerF. Fuction可以理解为map变换中的参数。
source就是指的map操作符之前的oberservable。这里就是A。
即执行A.subscribe(new MapObserver(t,function)
同理,A在执行sunscribe时也会执行他的subscribeActual的方法。
举例说 A = Observeable.creat()而来的。所以A是是oberservable的一个实现子类ObservableCreate。
ObservableCreate的subscribeActual内容是
source.subscribe(new MapObserver(t,f));
source是create方法的参数ObservableOnSubscribe。
source.subscribe(parent)的意思就是执行下面方法体中的内容。
public void subscribe(ObservableEmitter
e.onNext("hello");
}
这里的参数e是经过MapObserver包装而成的一个Emitter类型,这里e.onNext时候会调用MapObsever 的onNext方法。
而MapObserver的onNext方法可以简化为
public void onnext()
{
U v;try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) { fail(ex); return;}actual.onNext(v);
}
就是说先执行Map的变换方法。然后把这个方法的结果传递给observerF.
observerF在根据map的返回值调用next方法。
即下面这个方法。
subscribe(new Observer{
public void onNext(String value){
System.out.println(value);
}
};
可以看到代码上我们是从上而下写的。
但在执行时subscribeActual方法是自下而上执行的。而OnNext方法或者apply是从上而下执行的。
了解这个原理后,我们看一下切换线程。
首先来看一下subscribeOn.
observable 在执行subscribeOn后返回了一个ObservableSubscribeOn类型。也是Observable的子类型之一。
看一下它的subscribeActual方法。
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver parent = new SubscribeOnObserver(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); }
意思是所有执行切换线程的那个observable在执行subscribe的时候就切换了线程。
从上一个结论我们知道subscribeActual方法是自下而上执行的。也就是说在切换线程之前的所有的observable的subscribe的线程一直在新线程上运行。
因为是subscribeActual方法调用了onNext方法。所以从上而下执行的onNext方法也是在新线程上运行。
onNext方法一路向下调用,一直到最终注册的Observer.
这就解释了为什么我们明明写了一个subscribeOn为什么所有的OnNext的方法的执行都在新线程上。也就是说subscribe不仅影响之后的onNext的执行线程,也影响之前的onNext线程.
如果一个obserable串写了两个subscribeOn那么逆流而上执行的时候回选用先切为倒数第一个的线程,然后切为倒数第二个,直至切换为第一个的线程。然后自上而下执行onNext的时候就以最后切换的线程为准,即第一次切换的线程。
这就解释了为什么多次执行subscribeOn来切换线程,只有第一个有效的原因。
接下来我们看一下ObserveOn的方法。
observable 在执行subscribeOn后返回了一个ObservableObserveOn类型。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } }
在subscribeActual中并没有直接切换线程。只是将这个线程 w 存入了新的observer中。
那么oberveOn切换线程是在哪里执行的呢。在ObserveOnObserver中,我们看一下他的onNext方法。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
可以看到在onNext的时候,开始部署了新线程。我们看一下新线程开始后会怎样。
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }void drainNormal() { int missed = 1; final SimpleQueue q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
代码中的a.onNext(v)是在run中进行的。就是说这里已经运行在了新线程中。a指的是observer就是自下而上逆行时下游的oberserver.所以observerOn只能影响在其后面的线程。
如果多个observeOn同时使用时,由于obserbeOn影响的是onNext(),onNext自上而下运行。遇到新的observerOn就会切换到新的线程。
所以observeOn影响的是下游的onNext,遇到新的observeOn之后就会切换为新的线程,只对其后面的有效。
更多相关文章
- 浅谈Java中Collections.sort对List排序的两种方法
- Python list sort方法的具体使用
- python list.sort()根据多个关键字排序的方法实现
- android上一些方法的区别和用法的注意事项
- android实现字体闪烁动画的方法
- Android中dispatchDraw分析
- 浅析Android中的消息机制-解决:Only the original thread that cr
- Android四大基本组件介绍与生命周期
- Android(安卓)MediaPlayer 常用方法介绍