本文基于RxJava2.2.1版本分析。

简介

官方介绍:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava是一个处理异步操作的库、基于事件流的库

其核心的东西不外乎两个, Observable(被观察者)和 Observer(观察者)。当观察者订阅了被观察者之后,被观察者可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后将结果交给观察者处理。

create

create()是创建Observable对象的方法之一,还有其他诸多的创建方法,这里主要了解下Observable的创建流程:

        // 创建被观察者        Observable observable = Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                emitter.onNext(1);                emitter.onComplete();            }        });

进入create():

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {        // requireNonNull()很多地方会用到,顾名思义就是用来判空的        ObjectHelper.requireNonNull(source, "source is null");        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));    }

进入onAssembly():

    public static  Observable onAssembly(@NonNull Observable source) {        Function<? super Observable, ? extends Observable> f = onObservableAssembly;        if (f != null) {            // 这里hook            return apply(f, source);        }        return source;    }

onAssembly()主要是用来hook的,默认使用中都没有hook,返回的就是入参ObservableCreate。我们来看一下ObservableCreate:

    final ObservableOnSubscribe source;    public ObservableCreate(ObservableOnSubscribe source) {        this.source = source;    }

这里我们看到ObservableCreate将最初我们创建的ObservableOnSubscribe包装了一层,用成员变量source记住。

至此,创建流程结束,我们得到了Observable< T >对象,其实就是ObservableCreate< T >.

subscribe

承接上文的示例代码:

        // 创建观察者        Observer observer = new Observer() {            @Override            public void onSubscribe(Disposable d) {                Log.d(TAG, "subscribe");            }            @Override            public void onNext(Integer value) {                Log.d(TAG, "" + value);            }            @Override            public void onError(Throwable e) {                Log.d(TAG, "error");            }            @Override            public void onComplete() {                Log.d(TAG, "complete");            }        };        // 观察者和被观察者建立连接        observable.subscribe(observer);  

进入subscribe():

    public final void subscribe(Observer<? super T> observer) {        ObjectHelper.requireNonNull(observer, "observer is null");        try {            observer = RxJavaPlugins.onSubscribe(this, observer);            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");            subscribeActual(observer);        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // can't call onError because no way to know if a Disposable has been set or not            // can't call onSubscribe because the call might have set a Subscription already            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }    }

这里的关键代码只有一句subscribeActual(observer);被观察者被订阅时真正被执行的方法

    protected void subscribeActual(Observer<? super T> observer) {        CreateEmitter parent = new CreateEmitter(observer);         // 这里首先调用了observer的onSubscribe()        observer.onSubscribe(parent);        try {            // 然后调用了source的subscribe()            // 这里的sourec就是之前保存的ObservableCreate            // subscribe()就是我们自己一开始实现的方法            source.subscribe(parent);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            parent.onError(ex);        }    }

CreateEmitter是啥,它是一个事件的发射器。当我们在ObservableCreate的subscribe()中调用CreateEmitter的onNext()、onError()或者onComplete()方法时,它会去调用observer的onNext()、onError()或者onComplete()方法。

CreateEmitter parent = new CreateEmitter(observer);一句中,大概可以猜到,CreateEmitter持有了observer,确实如此:

 @Override        public void onNext(T t) {            if (t == null) {                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));                return;            }            if (!isDisposed()) {                observer.onNext(t);            }        }        @Override        public void onError(Throwable t) {            if (!tryOnError(t)) {                RxJavaPlugins.onError(t);            }        }        @Override        public boolean tryOnError(Throwable t) {            if (t == null) {                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");            }            if (!isDisposed()) {                try {                    observer.onError(t);                } finally {                    dispose();                }                return true;            }            return false;        }        @Override        public void onComplete() {            if (!isDisposed()) {                try {                    observer.onComplete();                } finally {                    dispose();                }            }        }

我们可以看到CreateEmitter的onNext()等方法被调用时,其中会调用对应的observer的onNext()等方法。

以上是最简单的流程,那在其中加入一个操作符会怎么样呢?我们来看一下map是做什么的。

map

map起到类型转换的作用,例子如下,源头observable发送的是String类型的数字,利用map转换成int型,最终在终点observer接受到的也是int类型数据:

        Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext("1");                e.onComplete();            }        }).map(new Function() {            @Override            public Integer apply(String s) throws Exception {                return Integer.parseInt(s);            }        }).subscribe(new Observer() {            @Override            public void onSubscribe(Disposable d) {                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");            }            @Override            public void onNext(Integer value) {                Log.d(TAG, "onNext() called with: value = [" + value + "]");            }            @Override            public void onError(Throwable e) {                Log.d(TAG, "onError() called with: e = [" + e + "]");            }            @Override            public void onComplete() {                Log.d(TAG, "onComplete() called");            }        });

来看它的源码:

    public final  Observable map(Function<? super T, ? extends R> mapper) {        ObjectHelper.requireNonNull(mapper, "mapper is null");        return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));    }

抛开onAssembly(),好像只是创建了一个ObservableMap并且返回:

    public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) {        // 将传入的observable记住        super(source);        // 将变换的方法记住        this.function = function;    }    @Override    public void subscribeActual(Observer<? super U> t) {        source.subscribe(new MapObserver(t, function));    }

这里和之前看到的ObservableCreate如出一辙,将上一个observable包装了起来。

然后当subscribeActual()执行的时候,去调用source的subscribe(),这里的source指的是上游的observable。并且创建了一个新的observer,MapObserver将下游的observer包装了一层。

如果多次map()的话,我们可以将函数调用的流程分为三步:

  • 首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
  • observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
  • 当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。

假设当前的observer是MapObserver,在它的onNext()被调用时会调用它的变换方法,然后继续调用下游observer的onNext(),如下:

       public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                downstream.onNext(null);                return;            }            U v;            try {                // 调用变换方法mapper.apply(t)                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            // 调用下游observer的onNext()            downstream.onNext(v);        }

最终调用最后一个observer的onNext()。

subscribeOn

用来指定subscribe()发生的线程的:

    public final Observable subscribeOn(Scheduler scheduler) {        ObjectHelper.requireNonNull(scheduler, "scheduler is null");        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));    }

这里返回了一个ObservableSubscribeOn

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {        super(source);        this.scheduler = scheduler;    }    @Override    public void subscribeActual(final Observer<? super T> observer) {        final SubscribeOnObserver parent = new SubscribeOnObserver(observer);        observer.onSubscribe(parent);        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));    }

这里做法其实和以上的提到过的ObservableCreate和ObservableMap中的是一样的,包装了上游的observable,并且额外记录了subscribe()的线程调度器

当subscribeActual()被执行的时候,会切换到线程调度器所指定的线程。

那么如果subscribeOn(Schedulers.xxx())切换线程N次,总是以第一次为准,或者说离源observable最近的那次为准,并且对其上面的代码生效。

observeOn

指定onNext()等方法执行的线程:

    public final Observable observeOn(Scheduler scheduler) {        return observeOn(scheduler, false, bufferSize());    }    public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {        ObjectHelper.requireNonNull(scheduler, "scheduler is null");        ObjectHelper.verifyPositive(bufferSize, "bufferSize");        return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));    }

还是似曾相识的场景,创建了一个ObservableObserveOn:

    public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) {        super(source);        this.scheduler = scheduler;        this.delayError = delayError;        this.bufferSize = bufferSize;    }    @Override    protected void subscribeActual(Observer<? super T> observer) {        if (scheduler instanceof TrampolineScheduler) {            source.subscribe(observer);        } else {            Scheduler.Worker w = scheduler.createWorker();            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));        }    }

自然少不了对原来observable的包装,同时记录了线程调度器

当从上游push消息过来的时候,我们会看到ObservableObserveOn会切换到所记录的线程调度器指定的线程:

        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != QueueDisposable.ASYNC) {                queue.offer(t);            }            schedule();        }        void schedule() {            if (getAndIncrement() == 0) {                worker.schedule(this);            }        }

以上是RxJava2最基本方法的一些源码分析。

链式调用原理

我们可以将函数调用的流程分为三步:

  • 首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
  • observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
  • 当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。

实战用例

以下是我写的demo的部分代码,其中模拟了两个RxJava的实战场景:

  1. 注册登录
  2. 每次进入页面,按顺序请求接口,展示弹窗(避免多个弹窗重叠)。
    public static final int REGISTER_SUCCESS_LOGIN_SUCCESS = 0;     // 注册成功, 登录成功    public static final int REGISTER_SUCCESS_LOGIN_FAIL = 1;        // 注册成功, 登录失败    public static final int REGISTER_FAIL = 2;                      // 注册失败    @Override    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_rx_java_test);    }    public void onLoginSuccessClick(View view) {        gotoRegister(REGISTER_SUCCESS_LOGIN_SUCCESS);    }    public void onLoginFailClick(View view) {        gotoRegister(REGISTER_SUCCESS_LOGIN_FAIL);    }    public void onRegisterFailClick(View view) {        gotoRegister(REGISTER_FAIL);    }    /**     * 按序加载信息     *     * @param view     */    public void onLoadInfoInSequenceClick(View view) {        int index = 0;        loadSingleInfo(index).observeOn(Schedulers.io())                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return loadSingleInfo(integer);                    }                })                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Consumer() {                    @Override                    public void accept(Integer integer) throws Exception {                        ToastUtil.showToast("new info are all loaded");                    }                });    }    /**     * 发起注册请求     *     * @param type     */    private void gotoRegister(final int type) {        register(type).subscribeOn(Schedulers.io())                .observeOn(Schedulers.io())                .flatMap(new Function>() {                    @Override                    public ObservableSource apply(Integer integer) throws Exception {                        return login(type);                    }                })                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                        ToastUtil.showToast("sending request...");                    }                    @Override                    public void onNext(Integer integer) {                        ToastUtil.showToast("login successfully");                    }                    @Override                    public void onError(Throwable e) {                        ToastUtil.showToast(e.getMessage());                    }                    @Override                    public void onComplete() {                    }                });    }    /**     * 注册     *     * @param type     * @return     */    private Observable register(final int type) {        return Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                // 模拟网络请求                SystemClock.sleep(1000);                if (registerSuccess(type)) {                    // 注册成功                    emitter.onNext(type);                } else {                    // 注册失败                    emitter.onError(new Exception("fail to register!"));                }            }        });    }    /**     * 登录     *     * @param type     * @return     */    private Observable login(final int type) {        return Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                // 模拟网络请求                SystemClock.sleep(1000);                if (loginSuccess(type)) {                    // 登录成功                    emitter.onNext(type);                } else {                    // 登录失败                    emitter.onError(new Exception("fail to login!"));                }            }        });    }    /**     * 是否注册成功     *     * @param type     * @return     */    private boolean registerSuccess(int type) {        return type != REGISTER_FAIL;    }    /**     * 是否登录成功     *     * @param type     * @return     */    private boolean loginSuccess(int type) {        return type == REGISTER_SUCCESS_LOGIN_SUCCESS;    }    /**     * 获取单个信息     *     * @param index     * @return     */    private Observable loadSingleInfo(final int index) {        return Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(final ObservableEmitter emitter) throws Exception {                // 模拟网络请求                SystemClock.sleep(200);                if (hasLoaded()) {                    emitter.onNext(index + 1);                } else {                    runOnUiThread(new Runnable() {                        @Override                        public void run() {                            showAlertDialog(index, new DialogInterface.OnDismissListener() {                                @Override                                public void onDismiss(DialogInterface dialog) {                                    emitter.onNext(index + 1);                                }                            });                        }                    });                }            }        });    }    /**     * 是否展示过该信息     *     * @return     */    private boolean hasLoaded() {        long currTime = SystemClock.currentThreadTimeMillis();        return currTime % 2 == 0;    }    /**     * 展示某个信息对应弹窗     *     * @param index     * @param dismissListener     */    private void showAlertDialog(int index, final DialogInterface.OnDismissListener dismissListener) {        new AlertDialog.Builder(this)                .setTitle("Dialog index " + index)                .setOnDismissListener(new DialogInterface.OnDismissListener() {                    @Override                    public void onDismiss(DialogInterface dialog) {                        if (dismissListener != null) {                            dismissListener.onDismiss(dialog);                        }                    }                })                .create()                .show();    }

demo的源码地址。

参考:
1.rxjava源码分析
2.RxJava2 源码解析(一)
3.RxJava2 源码解析(二)

更多相关文章

  1. Intent的定义及用法
  2. android计时器 message+handler; timer+timertask
  3. 禁止viewpager左右滑动
  4. Android(安卓)Java 与 C++ 调用过程中的常量,路径名、文件名、后
  5. 安卓全局获取Context
  6. Android(安卓)Studio之导入外部so库
  7. Identifying Sensors and Sensor Capabilities
  8. android ViewGroup左右滑屏方法2
  9. android测量心率的实现方法

随机推荐

  1. sqlite3使用教程1 SQLite 命令
  2. Linux系统下安装MySql 5.7.17 全过程
  3. SQL Sever数据库卡事务
  4. 查询自定义VO对象的sql
  5. 如果没有明确使用ISNULL,则左连接失败
  6. qt sql多重条件查询简便方法
  7. SQLServer行转列的问题
  8. 数据库操作类实现(C#,SqlClient)
  9. TTable怎样实现类似SQL Select..form...w
  10. 重复的数据只取一条,请问SQL语句怎么写