深入解析RxJava源码(一)Observable对象的构建
现在RxJava在Android界已经是无人不知无人不晓的技术了,如果你还不知道RxJava是什么,推荐你去看给 Android 开发者的 RxJava 详解,而本篇博客主要是通过一些例子和源码分析以加深对RxJava的理解,所以看本篇博客时请确保你已对RxJava有了一定的了解。
create
我们先来看个最简单的例子:
public static void create() { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.d("demo", "call"); subscriber.onNext("Martino"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.d("demo", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("demo", "onError:" + e.toString()); } @Override public void onNext(String s) { Log.d("demo", "onNext:" + s); } }); }
看下打印结果:
callonNext:MartinoonCompleted
这就跟我们之前的做法相似,我们在异步线程里执行任务,然后通过回调通知到主线程。而这里OnSubscribe的call()方法则有些类似于
Runnable的run()方法,Subscriber则类似于Callback。上面是一次事件的正常流程(为了好理解我尽量会避免出现“序列”等一些太专业的词)。但是如果我们的call()方法中有异常呢?
我们之前的做法是要不捕获异常要不抛出异常,那RxJava需要我们收到处理异常吗?我们在call()方法里制造一个异常,看看会发生什么?
public static void create() { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.d("demo", "call"); subscriber.onNext("Martino"); int a = 2 / 0; subscriber.onNext("Hello"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.d("demo", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("demo", "onError:" + e.toString()); } @Override public void onNext(String s) { Log.d("demo", "onNext:" + s); } } ); }
打印结果如下:
callonNext:MartinoonError:java.lang.ArithmeticException: divide by zero
我们看到,虽然我们没有处理异常,但是RxJava将异常直接通过onError传递到了Subscriber,并且onError与onCompleted是互斥的,最多只能出现一个。所以我们使用RxJava后不需要再在call()方法里处理异常了。
我们看下subscribe()方法里到底做了什么事情,hook只是说我们可以在返回前额外做一些事情,我们替换掉hook之后,代码如下:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); ... try { observable.onSubscribe.call(subscriber); return subscriber; } catch (Throwable e) { subscriber.onError(e); } return Subscriptions.unsubscribed(); }
其中参数subscriber是我们自定义的,而observable是我们create()是生成的,这里并没有看到onNext和onComplete,这两个是我们自己在call()方法里主动发起的,所以subscribe()里的流程是:subscriber.onStart() ->
observable.onSubscribe.call(subscriber) -> subscriber.onError(e),然后返回的是subscriber,而有异常时则直接会unsubscribed()。
from
上面构建Observable对象我们使用了自定义的OnSubscribe,那如果是通过from或just构建的话,我们并没有自定义OnSubscribe,那又是怎么交互的呢?
public static void from() { String[] names = {"张三", "李四", "王五"}; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.d("demo", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("demo", "onError:" + e.toString()); } @Override public void onNext(String s) { Log.d("demo", "onNext:" + s); } }; Observable.from(names).subscribe(subscriber); }
打印结果如下:
onNext:张三onNext:李四onNext:王五onCompleted
我们并没有定义onNext和onCompleted,打印的时候却出现了,那就说明系统一定帮我们指定了一个onSubscribe,并在其call()方法里调用了onNext和onCompleted,我们还是去看看源码:
public final static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable)); }
这里通过create()创建了一个Observable对象,并根据iterable新建了一个OnSubscribeFromIterable对象,其实现了onSubscribe接口,所以当我们subscribe()后,调用observable.onSubscribe.call(subscriber)时,实际上调用的是
OnSubscribeFromIterable的call()方法,我们看看做了什么:
@Override public void call(final Subscriber<? super T> o) { final Iterator<? extends T> it = is.iterator(); if (!it.hasNext() && !o.isUnsubscribed()) o.onCompleted(); else o.setProducer(new IterableProducer<T>(o, it)); }
我们看到:如果Iterator为空就会调用onCompleted()方法,否则会调用Subscriber的setProducer()方法,那想必onNext()一定是在setProducer()的时候调用到了,而该方法内部调用了Producer的request()方法,默认会走到fastpath()。
@Override public void request(long n) { ... fastpath(); ... }
再来看看fastpath()里又是什么:
void fastpath() { final Subscriber<? super T> o = this.o; final Iterator<? extends T> it = this.it; while (true) { if (o.isUnsubscribed()) { return; } else if (it.hasNext()) { o.onNext(it.next()); } else if (!o.isUnsubscribed()) { o.onCompleted(); return; } else { return; } } }
终于看到onNext()了,这个方法会迭代Iterator,然后将Iterator的当前元素通过onNext()传递到subscriber里,最后onCompleted()结束。
稍微总结下:当我们使用from()构造了Observable对象并自定义 subscriber后,当调用subscribe()时,系统会自动生成OnSubscribeFromIterable,并调用其call()方法,其内部最终会遍历Iterator,并将其值通过onNext()传递给subscriber,最后回调subscriber的onCompleted()。
just
接下来我们再看看just()的使用方法:
public static void just() { Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.d("demo", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("demo", "onError:" + e.toString()); } @Override public void onNext(String s) { Log.d("demo", "onNext:" + s); } }; Observable observable = Observable.just("张三", "李四", "王五"); observable.subscribe(subscriber); }
输出的结果跟上面一样:
onNext:张三onNext:李四onNext:王五onCompleted
我们来看看源码,首先看构造的Observable对象:
当只有一个参数时:
public final static <T> Observable<T> just(final T value) { return ScalarSynchronousObservable.create(value); }
当有多个参数时:
public final static <T> Observable<T> just(T t1, T t2, T t3) { return from(Arrays.asList(t1, t2, t3)); }
我们发现多于1个时,跟from()效果是一样的,不过最多只能有8个参数
主要看看只有一个参数时这个create()方法里是什么:
public static final <T> ScalarSynchronousObservable<T> create(T t) { return new ScalarSynchronousObservable<T>(t); }
新建了一个ScalarSynchronousObservable对象,内部生成了一个OnSubscribe对象,这里我们发现,call()方法里没有了同步锁的操作,性能上要有一定的提升
protected ScalarSynchronousObservable(final T t) { super(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> s) { s.onNext(t); s.onCompleted(); } }); this.t = t; }
而只有一个参数时我们试试:
public static void just() { Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.d("demo", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("demo", "onError:" + e.toString()); } @Override public void onNext(String s) { Log.d("demo", "onNext:" + s); } }; Observable observable = Observable.just("张三"); observable.subscribe(subscriber); }
打印如下:
onNext:张三onCompleted
我们看到,from()和just()构造Observable的不同之处在于,from()是直接传入了一个数组,而just()则是通过传入多个参数,而just()有多于1个参数时,两者其实是一样的。
最后,附上一张流程图。
更多相关文章
- 2种方式进行Spinner数据的添加
- Android中Matrix的pre post set方法理解
- Android消息处理探秘
- Android中invalidate()和postInvalidate() 的区别及使用方法
- Android(安卓)修改host文件的3种方法
- 【Android】定位与解决anr错误记录
- Retrofit源码导读
- Android(安卓)Service两种启动方式详解(总结版)
- 几种判断应用(Android(安卓)App)前后台状态的方法