Android之RxJava2
16lz
2021-01-25
RxJava 以观察者模式为骨架,在 2.0 中依旧如此。
不过此次更新中,出现了两种观察者模式:
- Observable ( 被观察者 ) / Observer ( 观察者 )
-
Flowable (被观察者)/ Subscriber (观察者)
导入项目:
// RxJavaimplementation "io.reactivex.rxjava2:rxjava:2.2.7"implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
Observable
1.被观察者 (小说),开始连载,读者订阅小说(subscribe),那么小说更新后 就会推送消息给读者。
// 步骤1:创建被观察者 Observable & 生产事件// // 即 连载小说,读者订阅小说,小说更新推送给读者// // 1. 创建被观察者 Observable 对象 小说novel Observable novel = Observable.create(new ObservableOnSubscribe() { // 2. 在复写的subscribe()里定义需要发送的事件 @Override public void subscribe(ObservableEmitter emitter) throws Exception { // 通过 ObservableEmitter类对象产生事件并通知观察者 // ObservableEmitter类介绍 // a. 定义:事件发射器 // b. 作用:定义需要发送的事件 & 向观察者发送事件 emitter.onNext("连载1"); //开始连载小说 emitter.onNext("连载2"); emitter.onNext("连载3"); emitter.onComplete(); } });//// //步骤2:创建观察者 Observer 并 定义响应事件行为 读者 reader //读者订阅小说 Observer reader = new Observer() { // 通过复写对应方法来 响应 被观察者 @Override public void onSubscribe(Disposable d) { d.dispose();// 取消订阅 连载小说更新的时候就不会再推送给读者 Log.e(TAG, "开始采用subscribe连接"); } // 默认最先调用复写的 onSubscribe() @Override public void onNext(String value) { Log.e(TAG, "对Next事件"+ value +"作出响应" ); } @Override public void onError(Throwable e) { Log.e(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.e(TAG, "对Complete事件作出响应"); } }; // 步骤3:通过订阅(subscribe)连接观察者和被观察者 // 即 小说被读者订阅 novel.subscribe(reader);
2.异步
//异步 Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { drawableRes = new ArrayList<>(); //.......从数据库中取出id资源数组操作 emitter.onNext(drawableRes); emitter.onComplete(); } }).flatMap(new Function>() { @Override public ObservableSource apply(List list) throws Exception { return Observable.fromIterable(list); } }).subscribeOn(Schedulers.io())//在IO线程执行数据库处理操作 .observeOn(AndroidSchedulers.mainThread())//在UI线程显示图片 .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.d("----","onSubscribe"); } @Override public void onNext(Integer integer) {// imageView.setImageResource(integer);//拿到id,加载图片// Log.d("----",integer+""); } @Override public void onError(Throwable e) { Log.d("----",e.toString()); } @Override public void onComplete() { Log.d("----","onComplete"); } });
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("rxjava"); emitter.onComplete(); } }).subscribeOn(Schedulers.newThread()) //事件产生的线程。 Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。 //Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 .observeOn(AndroidSchedulers.mainThread()) // 事件消费的线程。 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行 .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) {// d.dispose();//取消订阅 Log.d("----", "onSubscribe"); } @Override public void onNext(String s) { Log.d("----", s); } @Override public void onError(Throwable e) { Log.d("----", "onError"); } @Override public void onComplete() { Log.d("----", "onComplete"); } }); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
Flowable
Flowable是RxJava2.x中新增的类,专门用于应对背压(Backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则会造成点击两次的效果。
我们知道,在RxJava1.x中背压控制是由Observable完成的,使用如下:
Observable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
而在RxJava2.x中将其独立了出来,取名为Flowable。因此,原先的Observable已经不具备背压处理能力。
通过Flowable我们可以自定义背压处理策略。
测试Flowable例子如下:
Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureMode.ERROR) //指定背压处理策略,抛出异常 .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d("JG", integer.toString()); Thread.sleep(1000); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.d("JG",throwable.toString()); } });
或者可以使用类似RxJava1.x的方式来控制。
Flowable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
其中还需要注意的一点在于,Flowable并不是订阅就开始发送数据,而是需等到执行Subscription#request才能开始发送数据。当然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。手动指定的例子如下:
Flowable.range(1,10).subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE);//设置请求数 } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
更多相关文章
- 刚学android,练手写的一个播放器
- 【Android】软键盘弹出收起事件监听
- Android事件模型之interceptTouchEvnet ,onTouchEvent关系正解
- :Android(安卓)GestureDetector手势识别类
- Android(安卓)顶级视图DecorView的前世今生
- Fragment Management
- 【android】binder机制-servicemanager
- android 广播大全 Intent Action 事件
- Android快速实现断点续传的方法