RxJava1的使用介绍
16lz
2021-12-04
1.RxJava是什么?2.观察者模式?3.RxJava中的观察者模式?4.创建观察者。5.创建被观察者。6.Subscribe 订阅。7.变换操作符。8.线程调度。1.RxJava是什么? RxJava是一个可异步处理事件的框架。 在Android中异步一词很重要。Android规定在处理耗时操作时,需要开启一个子线程处理任务,这样会出现子线程和主线程通信的问题。 Android提供了Handler,AsynTask,HandlerThread等方式处理异步问题。 当业务繁多的时候,使用这些方式会变得代码臃肿,逻辑不清晰。这个时候就用到了Rxjava。 RxJava是一种基于可扩展的观察者模式实现的。2.观察者模式? 是一种对象间存在的一对一或一对多的关系。这种关系是依赖性的。当一个对象的状态发生变化时,所有依赖它的对象将收到通知。 例子:用户关注了一个公众号,当公众号有新的文章发表时,用户会收到提醒。 在这个模式中存在四个角色: 2.1 抽象被观察者:抽象出被观察者所具有的行为。比如可以添加用户,删除用户,通知用户等。 2.2 抽象观察者:抽象出观察者所具有的行为。比如收到通知等。 2.3 具体被观察者:具有抽象被观察者行为的实物。比如网站。 2.4 具体观察者:具有抽象观察者行为的实物。比如人。在代码中实现抽象观察者:
public interface InterfaceUserObserver { void update(String message); }
抽象被观察者:
public interface InterfaceWebsiteObserverable { void register(User user);//接受用于关注 void remove(User user);//删除用户 void notifyUser();//通知用户 }
具体观察者:
public class User implements InterfaceUserObserver { private String name; public User(String name){ this.name = name; } @Override public void update(String message) { System.out.println(name+"收到了一条消息:"+message); } }
具体被观察者:
public class Website implements InterfaceWebsiteObserverable{ private List users; private String message; public Website(){ users = new ArrayList<>(); } @Override public void register(User user) { users.add(user); } @Override public void remove(User user) { users.remove(user); } @Override public void notifyUser() { for(User user:users){ user.update(message); } } public void setMessage(String msg){ message = msg; notifyUser(); }}
Website website = new Website(); website.register(new User("小A")); website.register(new User("小B")); website.register(new User("小C")); website.register(new User("小D")); website.setMessage("今日看点:周杰伦的新歌发布啦。");打印:小A收到了一条消息:今日看点:周杰伦的新歌发布啦。小B收到了一条消息:今日看点:周杰伦的新歌发布啦。小C收到了一条消息:今日看点:周杰伦的新歌发布啦。小D收到了一条消息:今日看点:周杰伦的新歌发布啦。
3.RxJava中的观察者模式存在四个角色:3.1 观察者 Observer/Subscriber3.2 被观察者 Observable3.3 订阅 subscribe3.4 事件 event观察者Observer 和 被观察者Observable的联系是通过 subscribe订阅的。在有需要的时候被观察者会发送事件通知观察者。4.创建观察者观察者Observer决定了事件触发的时候将有怎样的行为。
Observer observer = new Observer(){ @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } };onCompleted():当被观察者调用onCompleted时触发。onError():当被观察者调用onError时触发。onNext():当被观察者调用onNext时触发。
另外一种方式:
Subscriber subscriber = new Subscriber(){ @Override public void onStart() { super.onStart(); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; subscriber.unsubscribe(); subscriber.isUnsubscribed();
Subscriber 相对 Observer来说会多出三个方法:onStart():在事件开始发送前调用。这个方法只会在subscribe发生的线程执行。isUnsubscribed():判断是否订阅被取消。unsubscribe():取消订阅。5.创建被观察者被观察者Observable决定什么时候触发事件以及触发怎样的事件。5.1通过create创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("事件A"); subscriber.onNext("事件B"); subscriber.onCompleted(); } });observable.subscribe(observer);打印:onNext:事件AonNext:事件BonCompleted
5.2通过just创建被观察者
Observable observable1 = Observable.just("hello");自动调用onNext()和onCompleted()observable1.subscribe(observer);打印:onNext:helloonCompleted
5.3通过from创建被观察者from()方法将传入的数组或Iterable拆分成具体对象后,自动调用OnNext方法一次发送。
Observable observable2 = Observable.from(new String[]{ "a","b","c","d","e" });List list = new ArrayList<>(); list.add("aa"); list.add("bb"); list.add("cc"); list.add("dd"); list.add("ee");Observable observable3 = Observable.from(list);observable3.subscribe(observer);打印:onNext:bbonNext:cconNext:ddonNext:eeonCompleted
5.4 通过defer()创建被观察者当观察者订阅时才创建Observable,并且对应每个观察者都会创建一个新的Observable。
Observable observable4 = Observable.defer(new Func0>() { @Override public Observable call() { Observable childObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("开始订阅事件了"); } }); return childObservable; } }); observable4.subscribe(new Action1() { @Override public void call(String s) { System.out.println("观察者1订阅 "+s); } }); observable4.subscribe(new Action1() { @Override public void call(String s) { System.out.println("观察者2订阅 "+s); } });
使用defer()方法,每当产生新的订阅事件时都会生成一个新的Observable对象。5.5 interval()创建一个按固定时间间隔发送整数序列的Observable,可作为轮询器使用。
Observable observable5 = Observable.interval(1,TimeUnit.SECONDS); observable5.subscribe(new Action1() { @Override public void call(Long aLong) { System.out.println("时间间隔:"+aLong); } });
5.6 timer()延迟一定的时间后才发送事件。
Observable observable = Observable.timer(2000,TimeUnit.MILLISECONDS); Subscriber subscriber = new Subscriber() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"); } @Override public void onNext(Long aLong) { System.out.println("onNext 收到消息啦"); } };
6.Subscribe 订阅一般会使用Observable.subscribe()方法将被观察者和观察者联系起来。6.1 不完整定义回调 Action接口,常用的有Action0和Action1Action0:它只有一个方法call(),这个方法无参数无返回值;Action1:它只有一个方法call(T param),这个方法有一个参数无返回值;ActionX:表示这个call()这个方法有X个参数。
Action0 onCompleteAction = new Action0() { @Override public void call() { System.out.println("onCompleteAction"); } }; Action1 onNextAction = new Action1(){ @Override public void call(String s) { System.out.println("onNext:"+s); } }; Action1 onErrorAction = new Action1(){ @Override public void call(Throwable throwable) { System.out.println("onErrorAction"); } };//只处理onNext事件 observable.subscribe(onNextAction);//只处理onNext和onError事件 observable.subscribe(onNextAction,onErrorAction);//只处理onNext,onError和onCompleted事件 observable.subscribe(onNextAction,onErrorAction,onCompleteAction);
7.变换操作符变换操作符的作用是将事件序列中的对象或整个系列进行加工处理,转换成不同的事件。7.1 mapmap操作符通过制定一个Func1对象,将原Observable对象转换为另一个Observable对象并发送。
List courses = new ArrayList<>(); courses.add(new Course("西游记")); courses.add(new Course("红楼梦")); courses.add(new Course("水浒传")); courses.add(new Course("三国演义")); Observable.from(courses) .map(new Func1(){ @Override public String call(Course course) { return course.getCourseName(); } }).subscribe(new Action1() { @Override public void call(String s) { System.out.println("你要读:"+s); } });打印:你要读:西游记你要读:红楼梦你要读:水浒传你要读:三国演义List courses = new ArrayList<>(); courses.add(new Course("西游记")); courses.add(new Course("红楼梦")); courses.add(new Course("水浒传")); courses.add(new Course("三国演义")); Observable.from(courses) .map(new Func1(){ @Override public String call(Course course) { return course.getCourseName(); } }).subscribe(new Action1() { @Override public void call(String s) { System.out.println("你要读:"+s); } });打印:你要读:西游记你要读:红楼梦你要读:水浒传你要读:三国演义
传入一个course对象,返回的是String类型的数据。FuncX和ActionX是类似的,不同的是FuncX是有返回值的。例如:Func1和Action1,Func1有一个参数,有返回值;Action1有一个参数,没有返回值。7.2 flatMapflatMap也是可以做变换的。不同于map的是,flatMap返回的是一个Observable类型。
List students = new ArrayList<>(); Student studentA = new Student("小明",new ArrayList()); studentA.getCourses().add(new Course("西游记")); studentA.getCourses().add(new Course("红楼梦")); studentA.getCourses().add(new Course("水浒传")); studentA.getCourses().add(new Course("三国演义")); Student studentB = new Student("小王",new ArrayList()); studentB.getCourses().add(new Course("语文")); studentB.getCourses().add(new Course("数学")); studentB.getCourses().add(new Course("英语")); Student studentC = new Student("小强",new ArrayList()); studentC.getCourses().add(new Course("C语言")); studentC.getCourses().add(new Course("Java")); studentC.getCourses().add(new Course("C++")); students.add(studentA); students.add(studentB); students.add(studentC); Observable.from(students) .flatMap(new Func1>() { @Override public Observable call(Student student) { return Observable.from(student.getCourses()); } }).subscribe(new Action1() { @Override public void call(Course course) { System.out.println(course.getCourseName()); } });
7.3 变换的原理RxJava 提供很多中变换,但本质上都是对事件序列的处理再发送。这个过程都离不开lift()。
public final Observable map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap(func));}public final Observable lift(final Operator<? extends R, ? super T> operator) { return new Observable(new OnSubscribeLift(onSubscribe, operator));}
在调用的lift方法时,返回一个新的Observable对象。而这个新的Observable对象需要的参数是一个新的OnSubscribe。当有lift()时,(1)lift()会创建一个新的Observable,对应的也会有一个新的OnSubscribe。(2)当调用lift()后的Observable.subscribe()的时候,使用的是lift()创建的新的Observable,于是它所触发的onSubscribe.call(subscriber),是新Observable中的新OnSubscribe。(3)这个新OnSubscribe的call()方法中的OnSubscribe,就是原始Observable的onSubscribe,在这个call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一个新的Subscriber,然后利用这个新Subscriber向原始Observable进行订阅。
总结:新创建的一个Subscriber与原始的Observable的Subscriber关联起来。当产生事件订阅时,新的Observable订阅了事件,然后通知原始Observable开始发送事件,原始Observable将事件发送给新的Subscriber,经过处理后发送给原始的Observable的Subscriber。当多个lift()时:
8.线程调度RxJava的线程调度使用的时Scheduler类。Scheduler的ApiSchedulers.immediate(): 直接在当前线程运行。Schedulers.newThread():启用新线程,并在新线程执行操作。Schedulers.io():启动I/O操作。Schedulers.computation(): 计算所使用的 Scheduler。Android专用的AndroidSchedulers.mainThread():它执行的操作将在Android主线程运行。例子:
Observable.from(students) .flatMap(new Func1>() { @Override public Observable call(Student student) { return Observable.from(student.getCourses()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1() { @Override public void call(Course course) { System.out.println(course.getCourseName()); } });
subscriberOn():指定Observable(被观察者)所在的线程,叫做事件产生线程;observeOn():指定Observer(观察者)所运行在的线程,叫做事件消费线程。
更多相关文章
- Thread
- Android处理ListView的条目长按事件
- Android事件分发机制完全解析,带你从源码的角度彻底理解(上)
- android RxJava(RxAndroid)的简单使用
- Android更新UI的方法
- Android监听底层事件的机制总结
- Android自动化测试工具——Monkey
- Android(安卓)任务、进程和线程
- 《Android(安卓)Dev Guide》系列教程8:用户界面