转自Android 写一个属于自己的Rxjava(一)
Android 写一个属于自己的Rxjava(二)
Rxjava的使用重点在于分清楚:上游发射事件,下游接收事件

只要分清楚哪些操作符是作用在上游,哪些作用在下游,在此基础上对上游或者下游封装多一层就成了Rxjava。
源码地址

CustomObservable

为了对比rxjava,所有的类名前面都加了Custom表示自定义的意思

定义两个下游的接收事件的基类(观察者):
CustomEmitterCustomObserver ,其实二者是基本一样的接口

负责接收onStartonNextonCompleteonError事件

其中CustomObserver是给暴露给外界使用的,而是CustomEmitter封装在内部使用

CustomEmitter

// 下游接收事件,并负责暴露给外部的发射事件者public interface CustomEmitter<T> {    void onNext(T value);    void onError(Throwable e);    void onComplete();}

CustomObserver

// 下游接收事件public interface CustomObserver<T> {    void onStart();    void onNext(T t);    void onError(Throwable e);    void onComplete();}

定义一个上游的执行回调事件的基类(被观察者)
CustomObservableSourceCustomObservableOnSubscribe

负责执行subscribe(耗时)方法,并发射(调用)onStartonNextonCompleteonError事件

其中CustomObservableOnSubscribe是给暴露给外界使用的,而CustomObservableSource是封装在内部使用

个人不喜欢观察者和被观察者的说法,总是分不清;

所以以下用上游执行者下游接收者

CustomObservableOnSubscribe

public interface CustomObservableOnSubscribe<T> {    void subscribe(CustomEmitter<T> emitter);}

CustomObservableSource

public interface CustomObservableSource<T> {    void subscribe(CustomObserver<? super T> observer);}

CustomObservable
CustomObservable 就是一个基类,负责创建各个对应的子类,这里的create()创建了CustomObservableCreate

public abstract class CustomObservable<T> implements CustomObservableSource {    public static <T> CustomObservable<T> create(CustomObservableOnSubscribe<T> source) {        return new CustomObservableCreate(source);    }       @Override    public void subscribe(CustomObserver observer) {        subscribeActual(observer);    }    protected abstract void subscribeActual(CustomObserver observer);}

这里之所以CustomObservableCreate继承CustomObservable而不是CustomObservableSource,是为了保证对外抛出去的是CustomObservable

  • CustomObservableCreate就是上游的执行事件者,负责封装一层CustomObservableOnSubscribe
  • CustomCreateEmitter就是下游的接收事件者,负责封装一层CustomObserver
// 上游封装了subscriberpublic class CustomObservableCreate<T> extends CustomObservable {    private CustomObservableOnSubscribe<T> subscriber;    public CustomObservableCreate(CustomObservableOnSubscribe<T> subscriber) {        this.subscriber = subscriber;    }    @Override    protected void subscribeActual(CustomObserver observer) {        CustomCreateEmitter emitter = new CustomCreateEmitter<T>(observer);        observer.onStart();        // 真正执行耗时方法        subscriber.subscribe(emitter);    }    // 下游封装了CustomObserver    private static class CustomCreateEmitter<T> implements CustomEmitter<T> {        private CustomObserver<? super T> observer;        CustomCreateEmitter(CustomObserver<? super T> observer) {            this.observer = observer;        }        @Override        public void onNext(T o) {            observer.onNext(o);        }        @Override        public void onError(Throwable e) {            observer.onError(e);        }        @Override        public void onComplete() {            observer.onComplete();        }    }}

测试代码:

public void testCreate() {   CustomObservable.create(new CustomObservableOnSubscribe<String>() {       @Override       public void subscribe(CustomEmitter<String> emitter) {           emitter.onNext("test create");           emitter.onComplete();       }   }).subscribe(ExampleUnitTest.<String>getObserver());}public static <T> CustomObserver getObserver() {    CustomObserver<T> observer = new CustomObserver<T>() {        @Override        public void onStart() {            System.out.println("==== start " + Thread.currentThread() + " ====");        }        @Override        public void onNext(T t) {            System.out.println(Thread.currentThread() + " next: " + t);        }        @Override        public void onError(Throwable e) {            System.out.println(Thread.currentThread() + " error: " + e);        }        @Override        public void onComplete() {            System.out.println("==== " + Thread.currentThread() + " complete ==== \n");        }    };    return observer;}

测试结果:

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test create
==== Thread[main,5,main] complete ====

操作符 map

Rxjava的强大有一方面就在于它丰富的操作符,其中常用之一的就是map

map的作用是在下游的接收事件者(观察者),将返回的结果进行转换映射

定义一个CustomFunction负责数据转换

public interface CustomFunction<T, R> {    R apply(T t);}

CustomObservable定义多一个map的静态方法

public <R> CustomObservable<R> map(CustomFunction<T, R> function) {     return new CustomObservableMap(this, function);}

CustomObservableMap

public class CustomObservableMap<R, T> extends CustomObservable {    private CustomObservableSource<T> source;    private CustomFunction<T, R> mapper;    public CustomObservableMap(CustomObservableSource<T> source, CustomFunction<T, R> mapper) {        this.source = source;        this.mapper = mapper;    }    @Override    protected void subscribeActual(CustomObserver observer) {        CustomMapObserver<T, R> mapObserver = new CustomMapObserver(observer, mapper);        source.subscribe(mapObserver);    }    private static class CustomMapObserver<T, R> implements CustomObserver<T> {        private CustomObserver<R> observer;        private CustomFunction<T, R> function;        public CustomMapObserver(CustomObserver<R> observer, CustomFunction<T, R> function) {            this.observer = observer;            this.function = function;        }        @Override        public void onStart() {            observer.onStart();        }        @Override        public void onNext(T result) {            // 做结果数据转换映射            observer.onNext(function.apply(result));        }        @Override        public void onError(Throwable e) {            observer.onError(e);        }        @Override        public void onComplete() {            observer.onComplete();        }    }}

测试代码:

public void testMap() {    CustomObservable.create(new CustomObservableOnSubscribe<String>() {        @Override        public void subscribe(CustomEmitter<String> emitter) {            emitter.onNext("test create");            emitter.onComplete();        }    }).map(new CustomFunction<String, String>() {        @Override        public String apply(String s) {            return "test map " + s;        }    }).subscribe(ExampleUnitTest.<String>getObserver());}

测试结果

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test map test create
==== Thread[main,5,main] complete ====

上面实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符

  • subscribeOn()
  • observeOn()
  • from()
  • zip()
  • flatmap()

subscribeOn()

subscribeOn()作用在上游的发射,先定义一个CustomScheduler,提供执行任务的接口

public class CustomScheduler {    private final Executor executor;    public CustomScheduler(Executor executor) {        this.executor = executor;    }    public CustomWorker createWorker() {        return new CustomWorker(executor);    }    public static class CustomWorker{        private final Executor executor;        public CustomWorker(Executor executor) {            this.executor = executor;        }        public void schedule(Runnable runnable) {            executor.execute(runnable);        }    }}

我们可以定义多种多样的CustomScheduler,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()一样的效果。

继续在CustomObservable中提供subscribeOn()的方法:

    // CustomObservable    public CustomObservable<T> subscribeOn(CustomScheduler scheduler) {        return new CustomObservableSubscribeOn(this, scheduler);    }

跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler线程池里面执行,下游Observer不需要做什么动作。

class CustomObservableSubscribeOn<T> extends CustomObservable<T> {    private CustomObservableSource<T> source;    private CustomScheduler scheduler;    public CustomObservableSubscribeOn(CustomObservableSource<T> source, CustomScheduler scheduler) {        this.source = source;        this.scheduler = scheduler;    }    @Override    protected void subscribeActual(final CustomObserver observer) {        final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer);        CustomScheduler.CustomWorker worker = scheduler.createWorker();        worker.schedule(new Runnable() {            @Override            public void run() {                // 将任务执行扔进CustomScheduler                source.subscribe(subscribeOnObserver);            }        });    }    private static final class CustomSubscribeOnObserver<T>  implements CustomObserver<T> {        final CustomObserver<? super T> actual;        CustomSubscribeOnObserver(CustomObserver<? super T> actual) {            this.actual = actual;        }        @Override        public void onStart() {            actual.onStart();        }        @Override        public void onNext(T t) {            actual.onNext(t);        }        @Override        public void onError(Throwable error) {            actual.onError(error);        }        @Override        public void onComplete() {            actual.onComplete();        }    }}

ObserveOn()

其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。

提供ObserverOn()方法

    // CustomObservable    public CustomObservable<T> observeOn(CustomScheduler scheduler) {        return new CustomObservableObserveOn(this, scheduler);    }

继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。

class CustomObservableObserveOn<T> extends CustomObservable<T> {    private CustomObservableSource<T> source;    private CustomScheduler scheduler;    public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) {        this.source = source;        this.scheduler = scheduler;    }    @Override    protected void subscribeActual(CustomObserver observer) {        CustomScheduler.CustomWorker worker = scheduler.createWorker();        CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn<T>(observer, worker);        source.subscribe(observerObserveOn);    }    private static class CustomObserverObserveOn<T> implements CustomObserver<T> {        private CustomObserver<T> observer;        private CustomScheduler.CustomWorker worker;        public CustomObserverObserveOn(CustomObserver<T> observer, CustomScheduler.CustomWorker worker) {            this.observer = observer;            this.worker = worker;        }        @Override        public void onStart() {            this.worker.schedule(new Runnable() {                @Override                public void run() {                    observer.onStart();                }            });        }        @Override        public void onNext(final T t) {            this.worker.schedule(new Runnable() {                @Override                public void run() {                    observer.onNext(t);                }            });        }        @Override        public void onError(final Throwable e) {            this.worker.schedule(new Runnable() {                @Override                public void run() {                    observer.onError(e);                }            });        }        @Override        public void onComplete() {            this.worker.schedule(new Runnable() {                @Override                public void run() {                    observer.onComplete();                }            });        }    }}

from()

Rxjava用fromIterable 操作符可以逐次发射list的中的数据。

怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。

   // CustomObservable    public static <T> CustomObservable<T> from(Iterable<T> values) {        return new CustomObservableIterable<>(values);    }

继续造CustomObservableIterable

class CustomObservableIterable<T> extends CustomObservable {    private Iterable<T> valueIter;    public CustomObservableIterable(Iterable<T> valueIter) {        this.valueIter = valueIter;    }    @Override    protected void subscribeActual(CustomObserver observer) {        CustomIterableObserver<T> iterableObserver = new CustomIterableObserver<>(valueIter, observer);        CustomInterableSource source = new CustomInterableSource();        source.subscribe(iterableObserver);    }    private class CustomInterableSource implements CustomObservableSource {        @Override        public void subscribe(CustomObserver observer) {            observer.onStart();            observer.onNext(null);            observer.onComplete();        }    }    private static class CustomIterableObserver<T> implements CustomObserver<T> {        private Iterable<T> valueIter;        private CustomObserver<T> observer;        CustomIterableObserver(Iterable<T> valueIter, CustomObserver<T> observer) {            this.valueIter = valueIter;            this.observer = observer;        }        @Override        public void onStart() {            this.observer.onStart();        }        @Override        public void onNext(T t) {            for (T value : valueIter) {                this.observer.onNext(value);            }        }        @Override        public void onError(Throwable e) {            this.observer.onError(e);        }        @Override        public void onComplete() {            this.observer.onComplete();        }    }}

zip

网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。

任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?
结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。

zip的实现比较复杂,同样先提供一个对外的静态方法

// CustomObservablepublic static <T, U, R> CustomObservable<R> zip(final CustomObservableSource<T> o1,                                                    final CustomObservableSource<U> o2,                                                    CustomBiFunction<T, U, R> mapper) {        List<CustomObservableSource<?>> list = Arrays.asList(o1, o2);        CustomFunction<Object[], R> arrayFunc = new CustomFunctions.Array2Func(mapper);        return new CustomObservableZip(list, arrayFunc);    }public interface CustomBiFunction<T, U, R> {    R apply(T t, U u);}

我们将CustomBitFunction转换成CustomFunction,更有通配性,简单理解就是表示多个CustomObservableSource转换成R结果。至于如何转换,直接看上面的github源码

public class CustomObservableZip<T, U, R> extends CustomObservable<T> {    List<CustomObservableSource<T>> sources;    CustomFunction<Object[], R> mapper;    public CustomObservableZip(List<CustomObservableSource<T>> sources, CustomFunction<Object[], R> mapper) {        this.sources = sources;        this.mapper = mapper;    }    @Override    protected void subscribeActual(CustomObserver observer) {        ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper);        zipCoordinator.subscribe();    }    static final class ZipCoordinator<T, R> {        CustomObserver<R> actual;        List<CustomObservableSource<T>> sources;        List<ZipObserver<T, R>> observers;        CustomFunction<Object[], R> mapper;        int size;        boolean isFinish;        ZipCoordinator(CustomObserver<R> observer,                       List<CustomObservableSource<T>> sources,                       CustomFunction<Object[], R> mapper) {            this.actual = observer;            this.sources = sources;            this.mapper = mapper;            this.size = sources.size();            this.observers = new ArrayList<>(size);            this.isFinish = false;        }        public void subscribe() {            actual.onStart();            for (int i = 0; i<size; i++) {                ZipObserver observer = new ZipObserver<T, R>(this);                observers.add(observer);            }            for (int i = 0; i<size; i++) {                sources.get(i).subscribe(observers.get(i));            }        }        void drain() {            if (isFinish) {                return;            }            boolean canMerge = true;            boolean isDone = true;            for (ZipObserver<T, R> observer: observers) {                if (!observer.isDone) {                    isDone = false;                }                if (observer.queue.isEmpty()) {                    canMerge = false;                }            }            if (canMerge) {                List<T> mergeList = new ArrayList<>(size);                for (ZipObserver<T, R> observer: observers) {                    T t = observer.queue.poll();                    mergeList.add(t);                }                actual.onNext(mapper.apply(mergeList.toArray()));            }            if (isDone) {                actual.onComplete();            }        }    }    static class ZipObserver<T, R> implements CustomObserver<T> {        boolean isDone;        ZipCoordinator<T, R> parent;        Queue<T> queue;        Throwable error;        public ZipObserver(ZipCoordinator parent) {            this.parent = parent;            this.queue = new LinkedList<>();            this.isDone = false;        }        @Override        public void onStart() {        }        @Override        public void onNext(T o) {            queue.add(o);            parent.drain();        }        @Override        public void onError(Throwable e) {            isDone = true;            error = e;            parent.drain();        }        @Override        public void onComplete() {            isDone = true;            parent.drain();        }    }}

事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。

flatmap

    public <R> CustomObservable<R> flatMap(CustomFunction<T, CustomObservableSource<R>> function) {        return new CustomObservableFlatMap(this, function);    }

其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。

public class CustomObservableFlatMap<T, R> extends CustomObservable {    private CustomObservableSource<T> source;    private CustomFunction<T, CustomObservableSource<R>> mapper;    public CustomObservableFlatMap(CustomObservableSource<T> source, CustomFunction<T, CustomObservableSource<R>> mapper) {        this.source = source;        this.mapper = mapper;    }    @Override    protected void subscribeActual(CustomObserver observer) {        CustomFlatMapObserver<T, R> flatMapObserver = new CustomFlatMapObserver(observer, mapper);        source.subscribe(flatMapObserver);    }    private static class CustomFlatMapObserver<T, R> implements CustomObserver<T> {        private CustomObserver<R> observer;        private CustomFunction<T, CustomObservableSource<R>> mapper;        public CustomFlatMapObserver(CustomObserver<R> observer, CustomFunction<T, CustomObservableSource<R>> mapper) {            this.observer = observer;            this.mapper = mapper;        }        @Override        public void onStart() {            observer.onStart();        }        @Override        public void onNext(T t) {            CustomObservableSource<R> source = mapper.apply(t);            InnerObserver<R> innerObserver = new InnerObserver<>(observer);            source.subscribe(innerObserver);        }        @Override        public void onError(Throwable e) {            observer.onError(e);        }        @Override        public void onComplete() {            observer.onComplete();        }        private static class InnerObserver<R> implements CustomObserver<R> {            private CustomObserver<R> observer;            InnerObserver(CustomObserver<R> observer) {                this.observer = observer;            }            @Override            public void onStart() {            }            @Override            public void onNext(R result) {                observer.onNext(result);            }            @Override            public void onError(Throwable e) {                observer.onError(e);            }            @Override            public void onComplete() {            }        }    }}

更多相关文章

  1. Android中MotionEvent的来源和ViewRootImpl
  2. Android(安卓)应用开发笔记 - UI开发详解
  3. Android压力测试之Monkey
  4. Android开发——菜单ActionBar
  5. Android(安卓)事件处理(—)(附源码)
  6. Android(安卓)View的onClick事件监听
  7. Android入门教程(八)之-----简单的Button事件响应综合提示控件To
  8. Android(安卓)RxJava操作符详解系列: 创建操作符
  9. Android:使用MediaPlayer播放本地音乐

随机推荐

  1. Android-SQLite3基本操作指令集合
  2. 配置阿里云RepoForge 镜像
  3. 怎么画好透视关系?动漫透视画法
  4. PHP实现自动加载机制
  5. PHP中使用extract函数
  6. PHP中token的生成案例
  7. php artisan命令信息列举
  8. 绘画到底怎么入门?零基础绘画入门技巧!
  9. php的Snoopy类案例讲解
  10. 意派Epub360丨如何做好金三银四招聘H5?来