上篇文章分析了subscribeOn的源码逻辑,它的实质就是把上游的Observable.onSubscribe.call(subscriber1)放到了指定的Scheduler线程中执行;本文要分析的observeOn,实质则是在Subscriber的onNext(T value)、onComplete()、onError()中另起了一个线程,在新线程里将数据和通知发射给下游的Subscriber。

还是老办法,将代码层层替换:

observable1.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber1);
就等价于:

scheduler = AndroidSchedulers.mainThread();
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
observable2 = new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe1, operator));
observable2.subscribe(subscriber1);
简单替换下:

operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
observable2.subscribe(subscriber1);
替换subscribe(),得到:

operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);

subscriber1.onStart();
(observable2.onSubscribe).call(subscriber1);
再次简单替换,得到:

operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

onSubscribe2.call(subscriber1);
替换最后一句,得到:

operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

Subscriber<? super T> st = hook.onLift(operator).call(subscriber1);
st.onStart();
onSubscribe1.call(st);
替换operator.call得到:

operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

Subscriber<? super T> st =
{
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, subscriber1, delayError, bufferSize);
parent.init();
return parent;
}

st.onStart();
onSubscribe1.call(st);
至此,可以看到,当上游的数据源发送数据时,会先发给中间侦听者ObserveOnSubscriber<T> st;查看ObserveOnSubscriber的代码可以看到,st的onNext(T value)、onComplete()、onError()方法最终都调用的是schedule(),在这个方法里,把自身作为一个Action,通过Handler的方式去postdelay一个Runable对象,这个Runable对象就是ScheduledAction。当系统执行到这个消息时,就会调用ScheduledAction的run方法,run方法里的“action.call();”这里的"action"就是对象ObserveOnSubscriber<T> st;查看ObserveOnSubscriber的call方法,可以看到最终调用的是"localChild.onNext(localOn.getValue(v));"这里的“localChild”即subscriber1对象。

相关代码附录如下:

// Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
其中RxRingBuffer.SIZE数值可设置,android平台默认是16.

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

// OnSubscribeLift
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}

@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}

// OperatorObserveOn
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}

// ObserveOnSubscriber
// static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
}

void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;

localChild.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}

});
localChild.add(recursiveScheduler);
localChild.add(this);
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}

@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}

@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
error = e;
finished = true;
schedule();
}

public void call() {
for (;;) {
long requestAmount = requested.get();

while (requestAmount != currentEmission) {

localChild.onNext(localOn.getValue(v));
}
}
}

// AndroidSchedulers
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());

// LooperScheduler
public Worker createWorker() {
return new HandlerWorker(handler);
}

// HandlerWorker
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

action = hook.onSchedule(action);

ScheduledAction scheduledAction = new ScheduledAction(action, handler);

Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.

handler.sendMessageDelayed(message, unit.toMillis(delayTime));

if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}

return scheduledAction;
}

@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}

// ScheduledAction
//final class ScheduledAction implements Runnable, Subscription

@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}




更多相关文章

  1. 在Skobbler中完成导航时,确定“街边”的最佳方法是什么?
  2. 【Camera】Android平台Camera实时滤镜实现方法
  3. Fragment的setUserVisibleHint方法实现视频音频播放暂停
  4. Android-SDK-Manager 不能更新最有效的解决方法
  5. [置顶] android MultiDex multidex原理下超出方法数的限
  6. android 关于读取SD卡或者U盘的一些方法
  7. 【Android笔记】Activity涉及界面全屏的方法
  8. Android中WebView实现Javascript调用Java类方法
  9. 由Android架构图所想到的Android开发学习方向方法

随机推荐

  1. Android驱动使用JNI调用
  2. Android使用HttpURLConnection和HttpClie
  3. Android(安卓)JNI 使用的数据结构JNINati
  4. Android-通过TensorFlow添加机器学习到Ap
  5. Android HelloWorld 例子
  6. Java Android 线程池
  7. Android 各国语言缩写-各国语言简称
  8. Android网络渗透套件—dSploit
  9. 〖Android〗简单隐藏Android虚拟键盘的方
  10. Android Studio开发准备