RxJava 以观察者模式为骨架,在 2.0 中依旧如此

不过此次更新中,出现了两种观察者模式:

  • Observable ( 被观察者 ) / Observer ( 观察者 )
  • Flowable (被观察者)/ Subscriber (观察者)

导入项目:

// RxJavaimplementation "io.reactivex.rxjava2:rxjava:2.2.7"implementation "io.reactivex.rxjava2:rxandroid:2.1.1"

 

Observable

1.被观察者 (小说),开始连载,读者订阅小说(subscribe),那么小说更新后 就会推送消息给读者。

       // 步骤1:创建被观察者 Observable & 生产事件//        // 即 连载小说,读者订阅小说,小说更新推送给读者//        //  1. 创建被观察者 Observable 对象  小说novel        Observable novel = Observable.create(new ObservableOnSubscribe() {            // 2. 在复写的subscribe()里定义需要发送的事件            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                // 通过 ObservableEmitter类对象产生事件并通知观察者                // ObservableEmitter类介绍                // a. 定义:事件发射器                // b. 作用:定义需要发送的事件 & 向观察者发送事件                emitter.onNext("连载1"); //开始连载小说                emitter.onNext("连载2");                emitter.onNext("连载3");                emitter.onComplete();            }        });////        //步骤2:创建观察者 Observer 并 定义响应事件行为    读者 reader        //读者订阅小说        Observer reader = new Observer() {            // 通过复写对应方法来 响应 被观察者            @Override            public void onSubscribe(Disposable d) {                d.dispose();//    取消订阅 连载小说更新的时候就不会再推送给读者                Log.e(TAG, "开始采用subscribe连接");            }            // 默认最先调用复写的 onSubscribe()            @Override            public void onNext(String value) {                Log.e(TAG, "对Next事件"+ value +"作出响应"  );            }            @Override            public void onError(Throwable e) {                Log.e(TAG, "对Error事件作出响应");            }            @Override            public void onComplete() {                Log.e(TAG, "对Complete事件作出响应");            }        };        // 步骤3:通过订阅(subscribe)连接观察者和被观察者        // 即 小说被读者订阅        novel.subscribe(reader);

 2.异步

    //异步        Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                drawableRes = new ArrayList<>(); //.......从数据库中取出id资源数组操作                emitter.onNext(drawableRes);                emitter.onComplete();            }        }).flatMap(new Function>() {            @Override            public ObservableSource apply(List list) throws Exception {                return Observable.fromIterable(list);            }        }).subscribeOn(Schedulers.io())//在IO线程执行数据库处理操作                .observeOn(AndroidSchedulers.mainThread())//在UI线程显示图片                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {                        Log.d("----","onSubscribe");                    }                    @Override                    public void onNext(Integer integer) {//                        imageView.setImageResource(integer);//拿到id,加载图片//                        Log.d("----",integer+"");                    }                    @Override                    public void onError(Throwable e) {                        Log.d("----",e.toString());                    }                    @Override                    public void onComplete() {                        Log.d("----","onComplete");                    }                });
 Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter emitter) throws Exception {                emitter.onNext("rxjava");                emitter.onComplete();            }        }).subscribeOn(Schedulers.newThread())  //事件产生的线程。  Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。 //Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。                .observeOn(AndroidSchedulers.mainThread())  // 事件消费的线程。 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行                .subscribe(new Observer() {                    @Override                    public void onSubscribe(Disposable d) {//                        d.dispose();//取消订阅                        Log.d("----", "onSubscribe");                    }                    @Override                    public void onNext(String s) {                        Log.d("----", s);                    }                    @Override                    public void onError(Throwable e) {                        Log.d("----", "onError");                    }                    @Override                    public void onComplete() {                        Log.d("----", "onComplete");                    }                });        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }

 

Flowable


Flowable是RxJava2.x中新增的类,专门用于应对背压(Backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则会造成点击两次的效果。
我们知道,在RxJava1.x中背压控制是由Observable完成的,使用如下:

  Observable.range(1,10000)            .onBackpressureDrop()            .subscribe(integer -> Log.d("JG",integer.toString()));



而在RxJava2.x中将其独立了出来,取名为Flowable。因此,原先的Observable已经不具备背压处理能力。
通过Flowable我们可以自定义背压处理策略。


测试Flowable例子如下:

  Flowable.create(new FlowableOnSubscribe() {            @Override            public void subscribe(FlowableEmitter e) throws Exception {                for(int i=0;i<10000;i++){                    e.onNext(i);                }                e.onComplete();            }        }, FlowableEmitter.BackpressureMode.ERROR) //指定背压处理策略,抛出异常                .subscribeOn(Schedulers.computation())                .observeOn(Schedulers.newThread())                .subscribe(new Consumer() {                    @Override                    public void accept(Integer integer) throws Exception {                        Log.d("JG", integer.toString());                        Thread.sleep(1000);                    }                }, new Consumer() {                    @Override                    public void accept(Throwable throwable) throws Exception {                        Log.d("JG",throwable.toString());                    }                });



或者可以使用类似RxJava1.x的方式来控制。

 

  Flowable.range(1,10000)                .onBackpressureDrop()                .subscribe(integer -> Log.d("JG",integer.toString()));


其中还需要注意的一点在于,Flowable并不是订阅就开始发送数据,而是需等到执行Subscription#request才能开始发送数据。当然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。手动指定的例子如下:

        Flowable.range(1,10).subscribe(new Subscriber() {            @Override            public void onSubscribe(Subscription s) {                s.request(Long.MAX_VALUE);//设置请求数            }            @Override            public void onNext(Integer integer) {            }            @Override            public void onError(Throwable t) {            }            @Override            public void onComplete() {            }        });


 

更多相关文章

  1. 刚学android,练手写的一个播放器
  2. 【Android】软键盘弹出收起事件监听
  3. Android事件模型之interceptTouchEvnet ,onTouchEvent关系正解
  4. :Android(安卓)GestureDetector手势识别类
  5. Android(安卓)顶级视图DecorView的前世今生
  6. Fragment Management
  7. 【android】binder机制-servicemanager
  8. android 广播大全 Intent Action 事件
  9. Android快速实现断点续传的方法

随机推荐

  1. SparseArray 那些事儿(带给你更细致的分析
  2. Android(安卓)7.0 SEAndroid(安卓)app权
  3. Android中如何实现定时任务
  4. 下载Android单个项目源码的方法
  5. View、Window、WindowManager-vsync信号
  6. Android适配之dimens适配终极攻略(实际项
  7. Android(安卓)Watchdog框架看门狗解析、
  8. Android(安卓)依赖注入:Dagger 实例讲解(De
  9. android parcelable 以及android studio
  10. Android(安卓)SugarORM(1)