有关 RxJava2 的学习笔记
额外参考文章:Android 响应式编程 RxJava2 完全解析


一、给初学者的RxJava2.0教程系列

1、创建的 ObservableObserver 的实例,不是一次性的(暂时需要加上某些前提,如在一般情况下),可以被重复使用,如下面代码所示:

Observable observable = Observable.create(observableEmitter -> {    observableEmitter.onNext(1);    observableEmitter.onNext(2);    observableEmitter.onNext(3);    observableEmitter.onNext(4);    observableEmitter.onComplete();});Observer observer = new Observer() {    @Override    public void onSubscribe(Disposable disposable) {        System.out.println("onSubscribe");    }    @Override    public void onNext(Integer integer) {        System.out.println("====> "+integer);    }    @Override    public void onError(Throwable throwable) {    }    @Override    public void onComplete() {        System.out.println("complete");    }};observable.subscribe(observer);observable.subscribe(observer);

对应的信息会打印两次,这就表明两个实例都是用来表示处理过程的,而不是表明一种一次性的状态的,observable 一被订阅(即使用 subscribe() 方法)就会触发对应逻辑的发射。


2、subscribeOn() 指定的是 Observable 发射事件的线程,即 create、just 等发射数据时的线程(多次指定其所属线程只有第一次指定的有效,因为 create、just 等都是互斥的), 如果没有指定默认为当前线程。
observeOn() 指定的是 Observer 接收事件的线程,包括map 等方法 所处的线程(多次指定其所属线程是可以的, 也就是说每调用一次 observeOn() , 其所属线程就会切换一次。如果没有指定,默认为 Observable 发射事件的线程。
例如想让 map()Schedulers.newThread() 线程中执行,则需要在使用 map() 前通过 observeOn(Schedulers.newThread()) 设置即可)。


3、《给初学者的RxJava2.0教程(九)》中:
FlowableEmitter 的 requested() 方法的结果表示未满足的订阅者通过 subscription.request() 方法请求的个数。

在订阅者与被订阅者同步的情况下,如果订阅者请求数据的个数小于被订阅者发送的数据个数,被多余的发射出的数据就被缓存起来,但是被缓存的数据个数超出了设置的背压策略的缓存个数时,就会出现异常 MissingBackpressureException(比如背压策略为 BackpressureStrategy.ERROR 时,最多能缓存 128 个被发射的数据)。

在订阅者与被订阅者异步的情况下,在被订阅者的线程中会内置单独的 request() 实例方法进行请求(会在一个特殊的时机触发该方法),将被订阅者的数据发射到缓存中,然后订阅者中的涉及的 request() 方法是另外一个实例的,不同于那个内置,且该实例的 request() 方法会从缓存中请求数据(暂时认为从缓存中请求,因为不知道当订阅者请求数据的速度大于发射数据速度时,被发射的数据会不会先经过缓存在被订阅者接收,不过猜想应该会直接由被订阅者发送给订阅者,因为这样效率相对搞高一点)
当背压策略为 BackpressureStrategy.ERROR 时,当订阅者从缓存中请求了第 96 个数据时,就会触发内置单独的 request() 实例方法请求被订阅者将下一波数据发射到缓存中。(假设被订阅者能够满足非常大的数据发射量,那如果一次性请求150个,此时缓存中有 128 个,从缓存中取了 128 个后,是不是剩下的直接由被订阅者发射到订阅者中,不经过缓存,然后150个之后的又会被缓存起来)


二、这可能是最好的RxJava 2.x 入门教程

1、教程(三)
(1) 在使用 timerinterval 会在默认的子线程中进行。

(2) doOnNext 用于让订阅者在接收到数据之前对发射对数据进行某些处理,如缓存等,但是不建议改变本来的数据,如果在该方法中改变了数据,则订阅者收到的数据也会被改变。

(3) skip 是跳过前 x 个数据,不是每隔 x 个数据。

(4) take 表示最多接受 x 个数据,因此可以使用该方法取前 x 个数据。

(5) just 是接受的是什么类型的参数,就会将该参数以对应的类型发射出去。如 just(new int[]{...}) 则订阅者会接受到数组类型的参数,而不是将数组中的元素逐个发射,这个与 1.x 中的 from() 存在区别,但是需要注意,如果是 just(1,2,3) 则订阅者会收到三个 Integer 类型的参数。


2、教程(四)
(1) debounce 用于除去小于特定时间跨度的数据,如下:

    Observable.create(new ObservableOnSubscribe<Integer>() {            @Override            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {                emitter.onNext(1); // skip                Thread.sleep(400);                emitter.onNext(2); // deliver                Thread.sleep(600);                emitter.onNext(3); // skip                Thread.sleep(100);                emitter.onNext(4); // deliver                Thread.sleep(605);                emitter.onNext(5); // deliver                Thread.sleep(510);                emitter.onComplete();            }        }).debounce(500, TimeUnit.MILLISECONDS)                .subscribeOn(Schedulers.io())                .subscribe(new Consumer<Integer>() {                    @Override                    public void accept(@NonNull Integer integer) throws Exception {                        Log.d(TAG, "debounce :" + integer + "\n");                    }                });


这里要注意啊,发射了 1 之后,因为再发射 2 的时候,会间隔 400ms,小于设定的 500ms,所以 1 会被除去,3 也同理。而不是某种惯性的认为的那样,对,就是非正确结果想的那样

(2) defer 才开始看的时候,一直不能理解其 在订阅后才生效的(也就是延迟) 的特性,后来在看了 【译】使用RxJava实现延迟订阅 才有所理解。

public class Test {    public static void main(String args[]) {        SomeType instance = new SomeType();        Observable value = instance.valueObservable();        instance.setValue("Some Value");        value.subscribe(System.out::println);    }}class SomeType {    private String value = "Init value";    public void setValue(String value) {        this.value = value;    }    public Observable valueObservable() {        return Observable.just(value);    }}

在非延迟的情况下,就像上面代码的直接使用 just 打印结果Init value,这种情况下,如果 value 没有被初始化,即

private String value = null;

还会因为 RxJava2 不支持 null 而报异常。

而使用 defer ,

Observable.defer(() -> Observable.just(value));

结果就会像预期那样打印为 Some Vaule

用本人理解后的话说就是,defer 发射的数据每次都会对应数据实例被发射时(即被订阅)的最新值而且还需要注意每次被订阅都会创建一个新的Observable对象。这就对性能来说会有一定的影响了。如果对性能有要求的话,可以参照那篇译文中的自己的封装一下create() 方法。

public Observable valueObservable() {    return Observable.create(observableEmitter -> {        observableEmitter.onNext(value);        observableEmitter.onComplete();    });}

(3) last 的参数是表示没有值的时候的默认值,如下:

    Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter e) throws Exception {                e.onComplete();            }        }).last(4).subscribe(new Consumer() {            @Override            public void accept(Integer integer) throws Exception {                Log.d("TAG", ""+integer);            }        });

输出为 4,这里需要注意的是 RxJava2 发射的数据支持 null 了。


(4) window 参照 RxJava操作符(二)Transforming Observables 一文的第六点,比较详细一点。
个人的暂时理解是将数据按照该方法的第一个参数 timespan(时间间隔) 进行分类,分类之后以多组数据分别以新的 Observable 发射。


3、教程(五)
(1) concat 可以做到有序的发射两个甚至多个 Observable 的发射事件(只接受相同泛型的参数),并且只有前一个 Observable 终止(onComplete) 后开始下一个 Observable。如下:

Observable o1 = Observable.create(new ObservableOnSubscribe() {    @Override    public void subscribe(ObservableEmitter e) throws Exception {        Log.d("TAG", "o1 " + Thread.currentThread());        e.onNext(1);        e.onNext(2);        e.onNext(3);        e.onComplete();//注意    }}).subscribeOn(Schedulers.newThread());Observable o2 = Observable.create(new ObservableOnSubscribe() {    @Override    public void subscribe(ObservableEmitter e) throws Exception {        Log.d("TAG", "o2 " + Thread.currentThread());        e.onNext(4);        e.onNext(5);        e.onNext(6);        e.onComplete();    }}).subscribeOn(AndroidSchedulers.mainThread());Observable.concat(o1, o2)        .subscribe(new Consumer() {    @Override    public void accept(Object o) throws Exception {        Log.d("TAG", "subscribe " + Thread.currentThread());        Log.d("TAG", o.toString());    }});

当每个 Observable 都指定了不同都线程时,则在 subscribe() 中则就因为不同线程发射都数据而切换到不同到线程。就像上面到代码,会有如下打印:

因为被处理时处于不同都线程,所以是并行处理,但是会按照顺序来。

而如果指定了 subscribe() 的线程(即在该方法前加上 observeOn(Schedulers.newThread())),则会在指定的线程中处理数据:

且所有数据都在同一线程中被处理,所以是串行处理。

更多相关文章

  1. SpringBoot 2.0 中 HikariCP 数据库连接池原理解析
  2. 一句话锁定MySQL数据占用元凶
  3. Android(安卓)ListView的创建以及数据适配器的使用
  4. android开发流程
  5. Andorid App Widget Framework分析之一:AppWidgetService
  6. SQLite数据库浅谈
  7. android不同activity共享数据的几种方法
  8. Android(安卓)CRC16计算产生校验码
  9. android 报错column '_id' does not exist的解决方案

随机推荐

  1. android 在新建短信时,加入名称为","(英文
  2. Android(安卓)Studio2.0 教程从入门到精
  3. Android调用系统短信功能发送短信
  4. android通过耳机控制音乐播放器
  5. Android(安卓)View获取焦点
  6. React Native调用Android原生组件
  7. App启动时,白屏和黑屏闪现的问题
  8. Android(安卓)项目规范
  9. Android之SharedPreferences详解
  10. Android(安卓)开发规范总结