前言

  • Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。

Github截图

如果还不了解 RxJava,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程

  • RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求
  • 今天,我将为大家详细介绍RxJava操作符中最常用的 功能性操作符,并附带 Retrofit 结合 RxJava的实例Demo教学,希望你们会喜欢。
  1. 本系列文章主要基于 Rxjava 2.0
  2. 接下来的时间,我将持续推出 AndroidRxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注Carson_Ho的安卓开发笔记!!

示意图


目录

示意图


1. 作用

辅助被观察者(Observable) 在发送事件时实现一些功能性需求

如错误处理、线程调度等等


2. 类型

  • RxJava 2 中,常见的功能性操作符 主要有:

示意图

  • 下面,我将对每个操作符进行详细讲解

3. 应用场景 & 对应操作符详解

注:在使用RxJava 2操作符前,记得在项目的Gradle中添加依赖:

dependencies {      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'      compile 'io.reactivex.rxjava2:rxjava:2.0.7'      // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在}

3.1 连接被观察者 & 观察者

  • 需求场景
    即使得被观察者 & 观察者 形成订阅关系

  • 对应操作符

subscribe()

  • 作用
    订阅,即连接观察者 & 被观察者

  • 具体使用

observable.subscribe(observer);// 前者 = 被观察者(observable);后者 = 观察者(observer 或 subscriber)<-- 1. 分步骤的完整调用 -->//  步骤1: 创建被观察者 Observable 对象        Observable observable = Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                emitter.onNext(1);                emitter.onNext(2);                emitter.onNext(3);                emitter.onComplete();            }        });// 步骤2:创建观察者 Observer 并 定义响应事件行为        Observer observer = new Observer() {            // 通过复写对应方法来 响应 被观察者            @Override            public void onSubscribe(Disposable d) {                Log.d(TAG, "开始采用subscribe连接");            }            // 默认最先调用复写的 onSubscribe()            @Override            public void onNext(Integer value) {                Log.d(TAG, "对Next事件"+ value +"作出响应"  );            }            @Override            public void onError(Throwable e) {                Log.d(TAG, "对Error事件作出响应");            }            @Override            public void onComplete() {                Log.d(TAG, "对Complete事件作出响应");            }        };                // 步骤3:通过订阅(subscribe)连接观察者和被观察者        observable.subscribe(observer);<-- 2. 基于事件流的链式调用 -->        Observable.create(new ObservableOnSubscribe() {        // 1. 创建被观察者 & 生产事件            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                emitter.onNext(1);                emitter.onNext(2);                emitter.onNext(3);                emitter.onComplete();            }        }).subscribe(new Observer() {            // 2. 通过通过订阅(subscribe)连接观察者和被观察者            // 3. 创建观察者 & 定义响应事件的行为            @Override            public void onSubscribe(Disposable d) {                Log.d(TAG, "开始采用subscribe连接");            }            // 默认最先调用复写的 onSubscribe()            @Override            public void onNext(Integer value) {                Log.d(TAG, "对Next事件"+ value +"作出响应"  );            }            @Override            public void onError(Throwable e) {                Log.d(TAG, "对Error事件作出响应");            }            @Override            public void onComplete() {                Log.d(TAG, "对Complete事件作出响应");            }        });    }}
  • 测试结果

示意图

  • 扩展说明
<-- Observable.subscribe(Subscriber) 的内部实现 -->public Subscription subscribe(Subscriber subscriber) {    subscriber.onStart();    // 在观察者 subscriber抽象类复写的方法 onSubscribe.call(subscriber),用于初始化工作    // 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件    // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式    // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时}

3.2 线程调度

  • 需求场景
    快速、方便指定 & 控制被观察者 & 观察者 的工作线程
  • 对应操作符使用
    由于该部分内容较多 & 重要,所以已独立一篇文章,请看文章:Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)

3.3 延迟操作

  • 需求场景
    即在被观察者发送事件前进行一些延迟的操作

  • 对应操作符使用

delay()

  • 作用
    使得被观察者延迟一段时间再发送事件

  • 方法介绍
    delay() 具备多个重载方法,具体如下:

// 1. 指定延迟时间// 参数1 = 时间;参数2 = 时间单位delay(long delay,TimeUnit unit)// 2. 指定延迟时间 & 调度器// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器delay(long delay,TimeUnit unit,mScheduler scheduler)// 3. 指定延迟时间  & 错误延迟// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数delay(long delay,TimeUnit unit,boolean delayError)// 4. 指定延迟时间 & 调度器 & 错误延迟// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
  • 具体使用
Observable.just(1, 2, 3)                .delay(3, TimeUnit.SECONDS) // 延迟3s再发送,由于使用类似,所以此处不作全部展示                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

示意图

3.4 在事件的生命周期中操作

  • 需求场景
    在事件发送 & 接收的整个生命周期过程中进行操作

如发送事件前的初始化、发送事件后的回调请求等

  • 对应操作符使用

do()

  • 作用
    在某个事件的生命周期中调用
  • 类型
    do()操作符有很多个,具体如下:

示意图

  • 具体使用
Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onNext(3);                e.onError(new Throwable("发生错误了"));                 }               })                // 1. 当Observable每发送1次数据事件就会调用1次                .doOnEach(new Consumer>() {                    @Override                    public void accept(Notification integerNotification) throws Exception {                        Log.d(TAG, "doOnEach: " + integerNotification.getValue());                    }                })                // 2. 执行Next事件前调用                .doOnNext(new Consumer() {                    @Override                    public void accept(Integer integer) throws Exception {                        Log.d(TAG, "doOnNext: " + integer);                    }                })                // 3. 执行Next事件后调用                .doAfterNext(new Consumer() {                    @Override                    public void accept(Integer integer) throws Exception {                        Log.d(TAG, "doAfterNext: " + integer);                    }                })                // 4. Observable正常发送事件完毕后调用                .doOnComplete(new Action() {                    @Override                    public void run() throws Exception {                        Log.e(TAG, "doOnComplete: ");                    }                })                // 5. Observable发送错误事件时调用                .doOnError(new Consumer() {                    @Override                    public void accept(Throwable throwable) throws Exception {                        Log.d(TAG, "doOnError: " + throwable.getMessage());                    }                })                // 6. 观察者订阅时调用                .doOnSubscribe(new Consumer() {                    @Override                    public void accept(@NonNull Disposable disposable) throws Exception {                        Log.e(TAG, "doOnSubscribe: ");                    }                })                // 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止                .doAfterTerminate(new Action() {                    @Override                    public void run() throws Exception {                        Log.e(TAG, "doAfterTerminate: ");                    }                })                // 8. 最后执行                .doFinally(new Action() {                    @Override                    public void run() throws Exception {                        Log.e(TAG, "doFinally: ");                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

示意图

3.5 错误处理

  • 需求场景
    发送事件过程中,遇到错误时的处理机制

  • 对应操作符类型

示意图

  • 对应操作符使用

onErrorReturn()

  • 作用
    遇到错误时,发送1个特殊事件 & 正常终止

可捕获在它之前发生的异常

  • 具体使用
Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Throwable("发生错误了"));                 }               })                .onErrorReturn(new Function() {                    @Override                    public Integer apply(@NonNull Throwable throwable) throws Exception {                        // 捕捉错误异常                        Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );                        return 666;                        // 发生错误事件后,发送一个"666"事件,最终正常结束                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

示意图

onErrorResumeNext()

  • 作用
    遇到错误时,发送1个新的Observable

注:

  1. onErrorResumeNext()拦截的错误 = Throwable;若需拦截Exception请用onExceptionResumeNext()
  2. onErrorResumeNext()拦截的错误 = Exception,则会将错误传递给观察者的onError方法
  • 具体使用
Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Throwable("发生错误了"));                 }               })                .onErrorResumeNext(new Function>() {                    @Override                    public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {                        // 1. 捕捉错误异常                        Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );                        // 2. 发生错误事件后,发送一个新的被观察者 & 发送事件序列                        return Observable.just(11,22);                                            }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

示意图

onExceptionResumeNext()

  • 作用
    遇到错误时,发送1个新的Observable

注:

  1. onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
  2. onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法
  • 具体使用
Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                 }               })                .onExceptionResumeNext(new Observable() {                    @Override                    protected void subscribeActual(Observer<? super Integer> observer) {                        observer.onNext(11);                        observer.onNext(22);                        observer.onComplete();                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

示意图

retry()

  • 作用
    重试,即当出现错误时,让被观察者(Observable)重新发射数据
  1. 接收到 onError()时,重新订阅 & 发送事件
  2. ThrowableException都可拦截
  • 类型

共有5种重载方法

<-- 1. retry() -->// 作用:出现错误时,让被观察者重新发送数据// 注:若一直错误,则一直重新发送<-- 2. retry(long time) -->// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制// 参数 = 重试次数 <-- 3. retry(Predicate predicate) -->// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)// 参数 = 判断逻辑<--  4. retry(new BiPredicate) -->// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)<-- 5. retry(long time,Predicate predicate) -->// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制// 参数 = 设置重试次数 & 判断逻辑
  • 具体使用
<-- 1. retry() -->// 作用:出现错误时,让被观察者重新发送数据// 注:若一直错误,则一直重新发送Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);                 }               })                .retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });<-- 2. retry(long time) -->// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制// 参数 = 重试次数Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);                 }               })                .retry(3) // 设置重试次数 = 3次                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });<-- 3. retry(Predicate predicate) -->// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)// 参数 = 判断逻辑Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);                 }               })                // 拦截错误后,判断是否需要重新发送请求                .retry(new Predicate() {                    @Override                    public boolean test(@NonNull Throwable throwable) throws Exception {                        // 捕获异常                        Log.e(TAG, "retry错误: "+throwable.toString());                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束                        //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)                        return true;                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });<--  4. retry(new BiPredicate) -->// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);                 }               })                // 拦截错误后,判断是否需要重新发送请求                .retry(new BiPredicate() {                    @Override                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {                        // 捕获异常                        Log.e(TAG, "异常错误 =  "+throwable.toString());                        // 获取当前重试次数                        Log.e(TAG, "当前重试次数 =  "+integer);                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束                        //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)                        return true;                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });<-- 5. retry(long time,Predicate predicate) -->// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制// 参数 = 设置重试次数 & 判断逻辑Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);                 }               })                // 拦截错误后,判断是否需要重新发送请求                .retry(3, new Predicate() {                    @Override                    public boolean test(@NonNull Throwable throwable) throws Exception {                        // 捕获异常                        Log.e(TAG, "retry错误: "+throwable.toString());                        //返回false = 不重新重新发送数据 & 调用观察者的onError()结束                        //返回true = 重新发送请求(最多重新发送3次)                        return true;                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });

retryUntil()

  • 作用
    出现错误后,判断是否需要重新发送数据
  1. 若需要重新发送 & 持续遇到错误,则持续重试
  2. 作用类似于retry(Predicate predicate)
  • 具体使用
    具体使用类似于retry(Predicate predicate),唯一区别:返回 true 则不重新发送数据事件。此处不作过多描述

retryWhen()

  • 作用
    遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
  • 具体使用
Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onError(new Exception("发生错误了"));                e.onNext(3);            }        })                // 遇到error事件才会回调                .retryWhen(new Function, ObservableSource<?>>() {                                        @Override                    public ObservableSource<?> apply(@NonNull Observable throwableObservable) throws Exception {                        // 参数Observable中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型                        // 返回Observable<?> = 新的被观察者 Observable(任意类型)                        // 此处有两种情况:                            // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:                            // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:                        return throwableObservable.flatMap(new Function>() {                            @Override                            public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {                                // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件                                // 该异常错误信息可在观察者中的onError()中获得                                 return Observable.error(new Throwable("retryWhen终止啦"));                                                                // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)                                 // return Observable.just(1);                            }                        });                    }                })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件"+ value  );                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应" + e.toString());                        // 获取异常错误信息                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });
  • 测试结果

新的Observable发送错误事件 = 原始Observable终止发送

新的Observable发送数据事件 = 原始Observable 持续重试

3.6 重复发送

  • 需求场景
    重复不断地发送被观察者事件

  • 对应操作符类型
    repeat() & repeatWhen()

repeat()

  • 作用
    无条件地、重复发送 被观察者事件

具备重载方法,可设置重复创建次数

  • 具体使用
// 不传入参数 = 重复发送次数 = 无限次        repeat();        // 传入参数 = 重复发送次数有限        repeatWhen(Integer int );// 注:  // 1. 接收到.onCompleted()事件后,触发重新订阅 & 发送  // 2. 默认运行在一个新的线程上        // 具体使用        Observable.just(1, 2, 3, 4)                .repeat(3) // 重复创建次数 =- 3次                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                        Log.d(TAG, "开始采用subscribe连接");                    }                                        @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件" + value);                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应");                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });

  • 测试结果

示意图

repeatWhen()

  • 作用
    有条件地、重复发送 被观察者事件

  • 原理
    将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable

  1. 若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
  2. 若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
  • 具体使用
 Observable.just(1,2,4).repeatWhen(new Function, ObservableSource<?>>() {            @Override            // 在Function函数中,必须对输入的 Observable进行处理,这里我们使用的是flatMap操作符接收上游的数据            public ObservableSource<?> apply(@NonNull Observable objectObservable) throws Exception {                // 将原始 Observable 停止发送事件的标识(Complete() /  Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)                // 以此决定是否重新订阅 & 发送原来的 Observable                // 此处有2种情况:                // 1. 若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable                // 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable                return objectObservable.flatMap(new Function>() {                    @Override                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {                        // 情况1:若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable                        return Observable.empty();                        // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()                        // return Observable.error(new Throwable("不再重新订阅事件"));                        // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。                        // 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable                        // return Observable.just(1);                       // 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件                    }                });            }        })                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                        Log.d(TAG, "开始采用subscribe连接");                    }                    @Override                    public void onNext(Integer value) {                        Log.d(TAG, "接收到了事件" + value);                    }                    @Override                    public void onError(Throwable e) {                        Log.d(TAG, "对Error事件作出响应:" + e.toString());                    }                    @Override                    public void onComplete() {                        Log.d(TAG, "对Complete事件作出响应");                    }                });   
  • 测试结果

新的Observable发送Complete 事件 = 原始Observable停止发送 & 不重新发送

新的Observable发送Error 事件 = 原始Observable停止发送 & 不重新发送

新的Observable发送其余事件 = 原始Observable重新发送

至此,RxJava 2中的功能性操作符讲解完毕。


4. 实际开发需求案例

  • 下面,我将 结合Retrofit & RxJava,讲解功能性操作符的3个实际需求案例场景:
    1. 线程操作(切换 / 调度 / 控制 )
    2. 轮询
    3. 发送网络请求时的差错重试机制

4.1 线程控制(切换 / 调度 )

  • 即,新开工作线程执行耗时操作;待执行完毕后,切换到主线程实时更新 UI
  • 具体请看文章:Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)

4.2 轮询

  • 需求场景说明

示意图

  • 下面,我将结合 RetrofitRxJava 用一个具体实例来实现轮询需求
  • 具体请看文章:Android RxJava 实际应用讲解:(有条件)网络请求轮询

4.3 发送网络请求时的差错重试机制

  • 需求场景说明

     

    示意图

  • 功能说明

     

    示意图

  • 下面我将结合 RetrofitRxJava 用一个具体实例来实现 发送网络请求时的 差错重试机制需求

  • 具体请看文章:Android RxJava 实际应用讲解:网络请求出错重连(结合Retrofit)


5. Demo地址

上述所有的Demo源代码都存放在:Carson_Ho的Github地址:RxJava2_功能性操作符


6. 总结

  • 下面,我将用一张图总结 RxJava2 中常用的功能性操作符

示意图

更多相关文章

  1. Android(安卓)UI Event Listener
  2. android基础之创建和解析xml
  3. Android(安卓)XML解析学习——Sax方式
  4. TextView 点击事件无效的完美解决
  5. androd 事件分发机制的初步理解
  6. 关于android的ListView的getCheckItemIds无法准确得到被选checkb
  7. Android(安卓)Native Crash崩溃及错误原因分析二-实战解决
  8. Android(安卓)>> 14. LiveData
  9. android中EditText与TextView共舞

随机推荐

  1. Android解决父控件拦截子控件手势滑动事
  2. android手机客户端上传文件,java servlet
  3. Android(安卓)模拟新浪微博-随便看看栏目
  4. android 多点触控
  5. Android(安卓)通讯录中用到的关于按名字
  6. linearLayout 和 relativeLayout的属性区
  7. android listview custom style 自定义样
  8. Android执行shell命令
  9. android调用setRequestedOrientation之后
  10. android加密解密完美教程