现在RxJava在Android界已经是无人不知无人不晓的技术了,如果你还不知道RxJava是什么,推荐你去看给 Android 开发者的 RxJava 详解,而本篇博客主要是通过一些例子和源码分析以加深对RxJava的理解,所以看本篇博客时请确保你已对RxJava有了一定的了解。

create

我们先来看个最简单的例子:

    public static void create() {        Observable.create(new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                Log.d("demo", "call");                subscriber.onNext("Martino");                subscriber.onCompleted();            }        }).subscribe(new Subscriber<String>() {            @Override            public void onCompleted() {                Log.d("demo", "onCompleted");            }            @Override            public void onError(Throwable e) {                Log.d("demo", "onError:" + e.toString());            }            @Override            public void onNext(String s) {                Log.d("demo", "onNext:" + s);            }        });    }

看下打印结果:

callonNext:MartinoonCompleted

这就跟我们之前的做法相似,我们在异步线程里执行任务,然后通过回调通知到主线程。而这里OnSubscribe的call()方法则有些类似于
Runnable的run()方法,Subscriber则类似于Callback。上面是一次事件的正常流程(为了好理解我尽量会避免出现“序列”等一些太专业的词)。但是如果我们的call()方法中有异常呢?
我们之前的做法是要不捕获异常要不抛出异常,那RxJava需要我们收到处理异常吗?我们在call()方法里制造一个异常,看看会发生什么?

    public static void create() {        Observable.create(new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                Log.d("demo", "call");                subscriber.onNext("Martino");                int a = 2 / 0;                subscriber.onNext("Hello");                subscriber.onCompleted();            }        }).subscribe(new Subscriber<String>() {                         @Override                         public void onCompleted() {                             Log.d("demo", "onCompleted");                         }                         @Override                         public void onError(Throwable e) {                             Log.d("demo", "onError:" + e.toString());                         }                         @Override                         public void onNext(String s) {                             Log.d("demo", "onNext:" + s);                         }                     }        );    }

打印结果如下:

callonNext:MartinoonError:java.lang.ArithmeticException: divide by zero

我们看到,虽然我们没有处理异常,但是RxJava将异常直接通过onError传递到了Subscriber,并且onError与onCompleted是互斥的,最多只能出现一个。所以我们使用RxJava后不需要再在call()方法里处理异常了。

我们看下subscribe()方法里到底做了什么事情,hook只是说我们可以在返回前额外做一些事情,我们替换掉hook之后,代码如下:

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {        ...        subscriber.onStart();          ...        try {             observable.onSubscribe.call(subscriber);            return subscriber;        } catch (Throwable e) {          subscriber.onError(e);        }        return Subscriptions.unsubscribed();    }

其中参数subscriber是我们自定义的,而observable是我们create()是生成的,这里并没有看到onNext和onComplete,这两个是我们自己在call()方法里主动发起的,所以subscribe()里的流程是:subscriber.onStart() ->
observable.onSubscribe.call(subscriber) -> subscriber.onError(e),然后返回的是subscriber,而有异常时则直接会unsubscribed()。

from

上面构建Observable对象我们使用了自定义的OnSubscribe,那如果是通过from或just构建的话,我们并没有自定义OnSubscribe,那又是怎么交互的呢?

    public static void from() {        String[] names = {"张三", "李四", "王五"};        Subscriber<String> subscriber = new Subscriber<String>() {            @Override            public void onCompleted() {                Log.d("demo", "onCompleted");            }            @Override            public void onError(Throwable e) {                Log.d("demo", "onError:" + e.toString());            }            @Override            public void onNext(String s) {                Log.d("demo", "onNext:" + s);            }        };        Observable.from(names).subscribe(subscriber);    }

打印结果如下:

onNext:张三onNext:李四onNext:王五onCompleted

我们并没有定义onNext和onCompleted,打印的时候却出现了,那就说明系统一定帮我们指定了一个onSubscribe,并在其call()方法里调用了onNext和onCompleted,我们还是去看看源码:

       public final static <T> Observable<T> from(Iterable<? extends T> iterable) {        return create(new OnSubscribeFromIterable<T>(iterable));    }

这里通过create()创建了一个Observable对象,并根据iterable新建了一个OnSubscribeFromIterable对象,其实现了onSubscribe接口,所以当我们subscribe()后,调用observable.onSubscribe.call(subscriber)时,实际上调用的是
OnSubscribeFromIterable的call()方法,我们看看做了什么:

    @Override    public void call(final Subscriber<? super T> o) {        final Iterator<? extends T> it = is.iterator();        if (!it.hasNext() && !o.isUnsubscribed())            o.onCompleted();        else             o.setProducer(new IterableProducer<T>(o, it));    }

我们看到:如果Iterator为空就会调用onCompleted()方法,否则会调用Subscriber的setProducer()方法,那想必onNext()一定是在setProducer()的时候调用到了,而该方法内部调用了Producer的request()方法,默认会走到fastpath()。

    @Override    public void request(long n) {        ...        fastpath();        ...    }

再来看看fastpath()里又是什么:

        void fastpath() {            final Subscriber<? super T> o = this.o;            final Iterator<? extends T> it = this.it;            while (true) {                if (o.isUnsubscribed()) {                    return;                } else if (it.hasNext()) {                    o.onNext(it.next());                } else if (!o.isUnsubscribed()) {                    o.onCompleted();                    return;                } else {                    return;                }            }        }

终于看到onNext()了,这个方法会迭代Iterator,然后将Iterator的当前元素通过onNext()传递到subscriber里,最后onCompleted()结束。

稍微总结下:当我们使用from()构造了Observable对象并自定义 subscriber后,当调用subscribe()时,系统会自动生成OnSubscribeFromIterable,并调用其call()方法,其内部最终会遍历Iterator,并将其值通过onNext()传递给subscriber,最后回调subscriber的onCompleted()。

just

接下来我们再看看just()的使用方法:

    public static void just() {        Subscriber<String> subscriber = new Subscriber<String>() {            @Override            public void onCompleted() {                Log.d("demo", "onCompleted");            }            @Override            public void onError(Throwable e) {                Log.d("demo", "onError:" + e.toString());            }            @Override            public void onNext(String s) {                Log.d("demo", "onNext:" + s);            }        };        Observable observable = Observable.just("张三", "李四", "王五");        observable.subscribe(subscriber);    }

输出的结果跟上面一样:

onNext:张三onNext:李四onNext:王五onCompleted

我们来看看源码,首先看构造的Observable对象:

当只有一个参数时:

    public final static <T> Observable<T> just(final T value) {        return ScalarSynchronousObservable.create(value);    }

当有多个参数时:

    public final static <T> Observable<T> just(T t1, T t2, T t3) {        return from(Arrays.asList(t1, t2, t3));    }

我们发现多于1个时,跟from()效果是一样的,不过最多只能有8个参数

主要看看只有一个参数时这个create()方法里是什么:

    public static final <T> ScalarSynchronousObservable<T> create(T t) {        return new ScalarSynchronousObservable<T>(t);    }

新建了一个ScalarSynchronousObservable对象,内部生成了一个OnSubscribe对象,这里我们发现,call()方法里没有了同步锁的操作,性能上要有一定的提升

    protected ScalarSynchronousObservable(final T t) {        super(new OnSubscribe<T>() {            @Override            public void call(Subscriber<? super T> s) {                s.onNext(t);                s.onCompleted();            }        });        this.t = t;    }

而只有一个参数时我们试试:

    public static void just() {        Subscriber<String> subscriber = new Subscriber<String>() {            @Override            public void onCompleted() {                Log.d("demo", "onCompleted");            }            @Override            public void onError(Throwable e) {                Log.d("demo", "onError:" + e.toString());            }            @Override            public void onNext(String s) {                Log.d("demo", "onNext:" + s);            }        };        Observable observable = Observable.just("张三");        observable.subscribe(subscriber);    }

打印如下:

onNext:张三onCompleted

我们看到,from()和just()构造Observable的不同之处在于,from()是直接传入了一个数组,而just()则是通过传入多个参数,而just()有多于1个参数时,两者其实是一样的。

最后,附上一张流程图。

更多相关文章

  1. 2种方式进行Spinner数据的添加
  2. Android中Matrix的pre post set方法理解
  3. Android消息处理探秘
  4. Android中invalidate()和postInvalidate() 的区别及使用方法
  5. Android(安卓)修改host文件的3种方法
  6. 【Android】定位与解决anr错误记录
  7. Retrofit源码导读
  8. Android(安卓)Service两种启动方式详解(总结版)
  9. 几种判断应用(Android(安卓)App)前后台状态的方法

随机推荐

  1. Android定时自动启动应用程序
  2. android edittext password hint字体不同
  3. android调用系统(相机)的图片,并且返回
  4. Android监听未接来电
  5. android调用浏览器打开网页链接
  6. Eclipse配置Android开发环境
  7. Android Failed to allocate memory: 145
  8. android 打开各种文件的方式
  9. MediaExtractor的seekTo方法精确定位到指
  10. Android 手写和笔锋研究资料