android写一个Rxjava
转自Android 写一个属于自己的Rxjava(一)
Android 写一个属于自己的Rxjava(二)
Rxjava的使用重点在于分清楚:上游发射事件,下游接收事件
只要分清楚哪些操作符是作用在上游,哪些作用在下游,在此基础上对上游或者下游封装多一层就成了Rxjava。
源码地址
CustomObservable
为了对比rxjava,所有的类名前面都加了Custom表示自定义的意思
定义两个下游的接收事件的基类(观察者):
CustomEmitter
和CustomObserver
,其实二者是基本一样的接口
负责接收onStart
、onNext
、onComplete
、onError
事件
其中CustomObserver
是给暴露给外界使用的,而是CustomEmitter
封装在内部使用
CustomEmitter
// 下游接收事件,并负责暴露给外部的发射事件者public interface CustomEmitter<T> { void onNext(T value); void onError(Throwable e); void onComplete();}
CustomObserver
// 下游接收事件public interface CustomObserver<T> { void onStart(); void onNext(T t); void onError(Throwable e); void onComplete();}
定义一个上游的执行回调事件的基类(被观察者)
CustomObservableSource
和CustomObservableOnSubscribe
负责执行subscribe
(耗时)方法,并发射(调用)onStart
、onNext
、onComplete
、onError
事件
其中CustomObservableOnSubscribe
是给暴露给外界使用的,而CustomObservableSource
是封装在内部使用
个人不喜欢观察者和被观察者的说法,总是分不清;
所以以下用上游执行者和下游接收者
CustomObservableOnSubscribe
public interface CustomObservableOnSubscribe<T> { void subscribe(CustomEmitter<T> emitter);}
CustomObservableSource
public interface CustomObservableSource<T> { void subscribe(CustomObserver<? super T> observer);}
CustomObservable
CustomObservable 就是一个基类,负责创建各个对应的子类,这里的create()创建了CustomObservableCreate
public abstract class CustomObservable<T> implements CustomObservableSource { public static <T> CustomObservable<T> create(CustomObservableOnSubscribe<T> source) { return new CustomObservableCreate(source); } @Override public void subscribe(CustomObserver observer) { subscribeActual(observer); } protected abstract void subscribeActual(CustomObserver observer);}
这里之所以CustomObservableCreate继承CustomObservable而不是CustomObservableSource,是为了保证对外抛出去的是CustomObservable
CustomObservableCreate
就是上游的执行事件者,负责封装一层CustomObservableOnSubscribeCustomCreateEmitter
就是下游的接收事件者,负责封装一层CustomObserver
// 上游封装了subscriberpublic class CustomObservableCreate<T> extends CustomObservable { private CustomObservableOnSubscribe<T> subscriber; public CustomObservableCreate(CustomObservableOnSubscribe<T> subscriber) { this.subscriber = subscriber; } @Override protected void subscribeActual(CustomObserver observer) { CustomCreateEmitter emitter = new CustomCreateEmitter<T>(observer); observer.onStart(); // 真正执行耗时方法 subscriber.subscribe(emitter); } // 下游封装了CustomObserver private static class CustomCreateEmitter<T> implements CustomEmitter<T> { private CustomObserver<? super T> observer; CustomCreateEmitter(CustomObserver<? super T> observer) { this.observer = observer; } @Override public void onNext(T o) { observer.onNext(o); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } }}
测试代码:
public void testCreate() { CustomObservable.create(new CustomObservableOnSubscribe<String>() { @Override public void subscribe(CustomEmitter<String> emitter) { emitter.onNext("test create"); emitter.onComplete(); } }).subscribe(ExampleUnitTest.<String>getObserver());}public static <T> CustomObserver getObserver() { CustomObserver<T> observer = new CustomObserver<T>() { @Override public void onStart() { System.out.println("==== start " + Thread.currentThread() + " ===="); } @Override public void onNext(T t) { System.out.println(Thread.currentThread() + " next: " + t); } @Override public void onError(Throwable e) { System.out.println(Thread.currentThread() + " error: " + e); } @Override public void onComplete() { System.out.println("==== " + Thread.currentThread() + " complete ==== \n"); } }; return observer;}
测试结果:
==== start Thread[main,5,main] ====
Thread[main,5,main] next: test create
==== Thread[main,5,main] complete ====
操作符 map
Rxjava的强大有一方面就在于它丰富的操作符,其中常用之一的就是map
map
的作用是在下游的接收事件者(观察者),将返回的结果进行转换映射
定义一个CustomFunction负责数据转换
public interface CustomFunction<T, R> { R apply(T t);}
CustomObservable定义多一个map
的静态方法
public <R> CustomObservable<R> map(CustomFunction<T, R> function) { return new CustomObservableMap(this, function);}
CustomObservableMap
public class CustomObservableMap<R, T> extends CustomObservable { private CustomObservableSource<T> source; private CustomFunction<T, R> mapper; public CustomObservableMap(CustomObservableSource<T> source, CustomFunction<T, R> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(CustomObserver observer) { CustomMapObserver<T, R> mapObserver = new CustomMapObserver(observer, mapper); source.subscribe(mapObserver); } private static class CustomMapObserver<T, R> implements CustomObserver<T> { private CustomObserver<R> observer; private CustomFunction<T, R> function; public CustomMapObserver(CustomObserver<R> observer, CustomFunction<T, R> function) { this.observer = observer; this.function = function; } @Override public void onStart() { observer.onStart(); } @Override public void onNext(T result) { // 做结果数据转换映射 observer.onNext(function.apply(result)); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } }}
测试代码:
public void testMap() { CustomObservable.create(new CustomObservableOnSubscribe<String>() { @Override public void subscribe(CustomEmitter<String> emitter) { emitter.onNext("test create"); emitter.onComplete(); } }).map(new CustomFunction<String, String>() { @Override public String apply(String s) { return "test map " + s; } }).subscribe(ExampleUnitTest.<String>getObserver());}
测试结果
==== start Thread[main,5,main] ====
Thread[main,5,main] next: test map test create
==== Thread[main,5,main] complete ====
上面实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符:
- subscribeOn()
- observeOn()
- from()
- zip()
- flatmap()
subscribeOn()
subscribeOn()
作用在上游的发射,先定义一个CustomScheduler
,提供执行任务的接口
public class CustomScheduler { private final Executor executor; public CustomScheduler(Executor executor) { this.executor = executor; } public CustomWorker createWorker() { return new CustomWorker(executor); } public static class CustomWorker{ private final Executor executor; public CustomWorker(Executor executor) { this.executor = executor; } public void schedule(Runnable runnable) { executor.execute(runnable); } }}
我们可以定义多种多样的CustomScheduler,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()一样的效果。
继续在CustomObservable中提供subscribeOn()的方法:
// CustomObservable public CustomObservable<T> subscribeOn(CustomScheduler scheduler) { return new CustomObservableSubscribeOn(this, scheduler); }
跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler线程池里面执行,下游Observer不需要做什么动作。
class CustomObservableSubscribeOn<T> extends CustomObservable<T> { private CustomObservableSource<T> source; private CustomScheduler scheduler; public CustomObservableSubscribeOn(CustomObservableSource<T> source, CustomScheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final CustomObserver observer) { final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer); CustomScheduler.CustomWorker worker = scheduler.createWorker(); worker.schedule(new Runnable() { @Override public void run() { // 将任务执行扔进CustomScheduler source.subscribe(subscribeOnObserver); } }); } private static final class CustomSubscribeOnObserver<T> implements CustomObserver<T> { final CustomObserver<? super T> actual; CustomSubscribeOnObserver(CustomObserver<? super T> actual) { this.actual = actual; } @Override public void onStart() { actual.onStart(); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable error) { actual.onError(error); } @Override public void onComplete() { actual.onComplete(); } }}
ObserveOn()
其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。
提供ObserverOn()方法
// CustomObservable public CustomObservable<T> observeOn(CustomScheduler scheduler) { return new CustomObservableObserveOn(this, scheduler); }
继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。
class CustomObservableObserveOn<T> extends CustomObservable<T> { private CustomObservableSource<T> source; private CustomScheduler scheduler; public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(CustomObserver observer) { CustomScheduler.CustomWorker worker = scheduler.createWorker(); CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn<T>(observer, worker); source.subscribe(observerObserveOn); } private static class CustomObserverObserveOn<T> implements CustomObserver<T> { private CustomObserver<T> observer; private CustomScheduler.CustomWorker worker; public CustomObserverObserveOn(CustomObserver<T> observer, CustomScheduler.CustomWorker worker) { this.observer = observer; this.worker = worker; } @Override public void onStart() { this.worker.schedule(new Runnable() { @Override public void run() { observer.onStart(); } }); } @Override public void onNext(final T t) { this.worker.schedule(new Runnable() { @Override public void run() { observer.onNext(t); } }); } @Override public void onError(final Throwable e) { this.worker.schedule(new Runnable() { @Override public void run() { observer.onError(e); } }); } @Override public void onComplete() { this.worker.schedule(new Runnable() { @Override public void run() { observer.onComplete(); } }); } }}
from()
Rxjava用fromIterable 操作符可以逐次发射list的中的数据。
怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。
// CustomObservable public static <T> CustomObservable<T> from(Iterable<T> values) { return new CustomObservableIterable<>(values); }
继续造CustomObservableIterable
class CustomObservableIterable<T> extends CustomObservable { private Iterable<T> valueIter; public CustomObservableIterable(Iterable<T> valueIter) { this.valueIter = valueIter; } @Override protected void subscribeActual(CustomObserver observer) { CustomIterableObserver<T> iterableObserver = new CustomIterableObserver<>(valueIter, observer); CustomInterableSource source = new CustomInterableSource(); source.subscribe(iterableObserver); } private class CustomInterableSource implements CustomObservableSource { @Override public void subscribe(CustomObserver observer) { observer.onStart(); observer.onNext(null); observer.onComplete(); } } private static class CustomIterableObserver<T> implements CustomObserver<T> { private Iterable<T> valueIter; private CustomObserver<T> observer; CustomIterableObserver(Iterable<T> valueIter, CustomObserver<T> observer) { this.valueIter = valueIter; this.observer = observer; } @Override public void onStart() { this.observer.onStart(); } @Override public void onNext(T t) { for (T value : valueIter) { this.observer.onNext(value); } } @Override public void onError(Throwable e) { this.observer.onError(e); } @Override public void onComplete() { this.observer.onComplete(); } }}
zip
网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。
任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?
结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。
zip的实现比较复杂,同样先提供一个对外的静态方法
// CustomObservablepublic static <T, U, R> CustomObservable<R> zip(final CustomObservableSource<T> o1, final CustomObservableSource<U> o2, CustomBiFunction<T, U, R> mapper) { List<CustomObservableSource<?>> list = Arrays.asList(o1, o2); CustomFunction<Object[], R> arrayFunc = new CustomFunctions.Array2Func(mapper); return new CustomObservableZip(list, arrayFunc); }public interface CustomBiFunction<T, U, R> { R apply(T t, U u);}
我们将CustomBitFunction转换成CustomFunction
public class CustomObservableZip<T, U, R> extends CustomObservable<T> { List<CustomObservableSource<T>> sources; CustomFunction<Object[], R> mapper; public CustomObservableZip(List<CustomObservableSource<T>> sources, CustomFunction<Object[], R> mapper) { this.sources = sources; this.mapper = mapper; } @Override protected void subscribeActual(CustomObserver observer) { ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper); zipCoordinator.subscribe(); } static final class ZipCoordinator<T, R> { CustomObserver<R> actual; List<CustomObservableSource<T>> sources; List<ZipObserver<T, R>> observers; CustomFunction<Object[], R> mapper; int size; boolean isFinish; ZipCoordinator(CustomObserver<R> observer, List<CustomObservableSource<T>> sources, CustomFunction<Object[], R> mapper) { this.actual = observer; this.sources = sources; this.mapper = mapper; this.size = sources.size(); this.observers = new ArrayList<>(size); this.isFinish = false; } public void subscribe() { actual.onStart(); for (int i = 0; i<size; i++) { ZipObserver observer = new ZipObserver<T, R>(this); observers.add(observer); } for (int i = 0; i<size; i++) { sources.get(i).subscribe(observers.get(i)); } } void drain() { if (isFinish) { return; } boolean canMerge = true; boolean isDone = true; for (ZipObserver<T, R> observer: observers) { if (!observer.isDone) { isDone = false; } if (observer.queue.isEmpty()) { canMerge = false; } } if (canMerge) { List<T> mergeList = new ArrayList<>(size); for (ZipObserver<T, R> observer: observers) { T t = observer.queue.poll(); mergeList.add(t); } actual.onNext(mapper.apply(mergeList.toArray())); } if (isDone) { actual.onComplete(); } } } static class ZipObserver<T, R> implements CustomObserver<T> { boolean isDone; ZipCoordinator<T, R> parent; Queue<T> queue; Throwable error; public ZipObserver(ZipCoordinator parent) { this.parent = parent; this.queue = new LinkedList<>(); this.isDone = false; } @Override public void onStart() { } @Override public void onNext(T o) { queue.add(o); parent.drain(); } @Override public void onError(Throwable e) { isDone = true; error = e; parent.drain(); } @Override public void onComplete() { isDone = true; parent.drain(); } }}
事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。
flatmap
public <R> CustomObservable<R> flatMap(CustomFunction<T, CustomObservableSource<R>> function) { return new CustomObservableFlatMap(this, function); }
其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。
public class CustomObservableFlatMap<T, R> extends CustomObservable { private CustomObservableSource<T> source; private CustomFunction<T, CustomObservableSource<R>> mapper; public CustomObservableFlatMap(CustomObservableSource<T> source, CustomFunction<T, CustomObservableSource<R>> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(CustomObserver observer) { CustomFlatMapObserver<T, R> flatMapObserver = new CustomFlatMapObserver(observer, mapper); source.subscribe(flatMapObserver); } private static class CustomFlatMapObserver<T, R> implements CustomObserver<T> { private CustomObserver<R> observer; private CustomFunction<T, CustomObservableSource<R>> mapper; public CustomFlatMapObserver(CustomObserver<R> observer, CustomFunction<T, CustomObservableSource<R>> mapper) { this.observer = observer; this.mapper = mapper; } @Override public void onStart() { observer.onStart(); } @Override public void onNext(T t) { CustomObservableSource<R> source = mapper.apply(t); InnerObserver<R> innerObserver = new InnerObserver<>(observer); source.subscribe(innerObserver); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } private static class InnerObserver<R> implements CustomObserver<R> { private CustomObserver<R> observer; InnerObserver(CustomObserver<R> observer) { this.observer = observer; } @Override public void onStart() { } @Override public void onNext(R result) { observer.onNext(result); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { } } }}
更多相关文章
- Android中MotionEvent的来源和ViewRootImpl
- Android(安卓)应用开发笔记 - UI开发详解
- Android压力测试之Monkey
- Android开发——菜单ActionBar
- Android(安卓)事件处理(—)(附源码)
- Android(安卓)View的onClick事件监听
- Android入门教程(八)之-----简单的Button事件响应综合提示控件To
- Android(安卓)RxJava操作符详解系列: 创建操作符
- Android:使用MediaPlayer播放本地音乐