打算从这篇博客开始整理一下Observable的那些api用法

这篇博客的用法主要有 :  just , from, interval, timer, range, never, empty, error, defer

just

public static  Observable just(final T value)public static  Observable just(T... t1)

just会把传入的参数值数组依次输出,具体发布原理上一篇已经分析过了

Observable.just(1,2,3).subscribe(new Action1() {            @Override            public void call(Integer integer) {            }});

这里会依次输出1,2,3

from

public static  Observable from(T[] array)public static  Observable from(Future<? extends T> future)public static  Observable from(Future<? extends T> future, long timeout, TimeUnit unit)public static  Observable from(Future<? extends T> future, Scheduler scheduler)public static  Observable from(Iterable<? extends T> iterable)

方法有点多我们分别分析一下

public static  Observable from(T[] array)

这个方法和上面的just用法一致,其实just的方法就是封装成数组后调用from方法

public static  Observable from(Future<? extends T> future)

这个方法传入的是一个Future对象,就是一个线程操作对象,看下最终回调的调用方法

 public void call(Subscriber<? super T> subscriber) {            subscriber.add(Subscriptions.create(new Action0() {                @Override                public void call() {                    // If the Future is already completed, "cancel" does nothing.                    that.cancel(true);                }            }));            try {                    ...                if (subscriber.isUnsubscribed()) {                    return;                }                T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);                subscriber.onNext(value);                subscriber.onCompleted();            } catch (Throwable e) {                  ...            }        }

其中that就是我们传入的future对象,这块会调用future的get方法返回相应的值,再把这个值发布到我们的subscriber中,并在最终结束后用cancel方法取消任务

注:这个方法虽然可以开辟新的线程去处理数据,但是最终回调还是会回到subscribe的线程,而且future的get的方法可能会阻塞主线程,所以就有了后面的超时参数方法

public static  Observable from(Future<? extends T> future, long timeout, TimeUnit unit)

这个用法和上面一致,不同的是多了两个参数,一个是超时的值,一个是超时的时间类型;如果在超时时间内,futuer并没有获得返回值,就会抛出超时异常,也就是会直接回调subscriber的onError方法;和上面的方法一样,这个也可能会阻塞主线程

 ExecutorService executors = Executors.newSingleThreadExecutor();        Future future = executors.submit(new Callable() {            @Override            public String call() throws Exception {                Log.e(TAG, "start thread= " + Thread.currentThread());                //do something                Thread.sleep(2000);                return "1234";            }        });        Observable.from(future).subscribe(new Action1() {            @Override            public void call(String s) {                Log.e(TAG, "end thread= " + Thread.currentThread() + " :  value= " + s);            }        });        Log.e(TAG,"back to main Thread");

输出结果是

01-28 17:30:26.635  E/Main: start thread= Thread[pool-1-thread-1,5,main]01-28 17:30:28.635  E/Main: end thread= Thread[main,5,main] :  value= 123401-28 17:30:28.636  E/Main: back to main Thread

可见处理分别是在两个线程,并且确实阻塞了主线程

除此之外,还有一个方法和这个类似

public static  Observable fromCallable(Callable<? extends T> func)

这个省去了创建线程的过程,直接通过callable方法返回处理后的值,和上面不同的是,这个处理的线程和subscriber要发布的是同一个线程;Future是异步处理可以监测超时,而callable则是同步处理,操作更简单

public static  Observable from(Future<? extends T> future, Scheduler scheduler)

这里多了一个参数Scheduler,这个是Rx的线程概念,Rx是支持处理和发布的线程随时切换的

比如我们指定线程是Rx的io线程

        Log.e(TAG,"thread0= " +Thread.currentThread());        ExecutorService executorService = Executors.newSingleThreadExecutor();        Future future = executorService.submit(new Callable() {            @Override            public String call() throws Exception {                Log.e(TAG,"thread1= " +Thread.currentThread());                return "1234";            }        });        Observable.from(future,Schedulers.io()).subscribe(new Action1() {            @Override            public void call(String s) {                Log.e(TAG,"thread2= " +Thread.currentThread());            }        });

输出结果

E/Main: thread0= Thread[main,5,main]E/Main: thread1= Thread[pool-1-thread-1,5,main]E/Main: thread2= Thread[RxCachedThreadScheduler-1,5,main]

我们会发现发布线程变成了Rx的cache线程,这就实现了线程切换,最终改变的发布的线程,其实

public static  Observable from(Future<? extends T> future, Scheduler scheduler) {        return create(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);    }

这个方法就是调用了切换线程的方法

public static  Observable from(Iterable<? extends T> iterable)

这个传入的是一个迭代器对象,而实现这个借口的基本都是Collection;我们常用的List和Set都是这个范畴,用法和传入数组的基本类似

  List list = new ArrayList<>(Arrays.asList(1,2,3));        Observable.from(list).subscribe(new Action1() {            @Override            public void call(Integer integer) {                //do something            }        });    }

interval

public static Observable interval(long interval, TimeUnit unit)public static Observable interval(long initialDelay, long period, TimeUnit unit)public static Observable interval(long interval, TimeUnit unit, Scheduler scheduler)public static Observable interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

这个是一个定时计数的方法,看下具体内容

final Worker worker = scheduler.createWorker();        child.add(worker);        worker.schedulePeriodically(new Action0() {            long counter;            @Override            public void call() {                try {                    child.onNext(counter++);                } catch (Throwable e) {                    try {                        worker.unsubscribe();                    } finally {                        Exceptions.throwOrReport(e, child);                    }                }            }                    }, initialDelay, period, unit);

这个方法会根据设置的间隔和初始依次叠加数值,从0开始无上限,一般会搭配take使用,take表示取值的最大个数

public static Observable interval(long interval, TimeUnit unit)public static Observable interval(long initialDelay, long period, TimeUnit unit)

第一个方法的interval表示间隔,unit表示时间类型

第二个方法的initalDelay表示开始计时的延迟时间,period表示间隔,unit表示时间类型(前两个参数都是用这个类型)

 Log.e(TAG, "start count");        Observable.interval(2, 1, TimeUnit.SECONDS).take(5).subscribe(new Action1() {            @Override            public void call(Long aLong) {                Log.e(TAG, "result = " + aLong);            }        });

输出结果是

18:10:27.603 E/Main: start count18:10:29.619 E/Main: result = 018:10:30.620 E/Main: result = 118:10:31.619 E/Main: result = 218:10:32.617 E/Main: result = 318:10:33.618 E/Main: result = 4

所以是延迟2s后依次打印相应数据

注: 这个call回调发生在compution线程,非主线程

public static Observable interval(long interval, TimeUnit unit, Scheduler scheduler)public static Observable interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

这里提供了切换线程的方法,让实际操作回调反生在我们定义的线程,比如上面的可以改为

Observable.interval(2, 1, TimeUnit.SECONDS,AndroidSchedulers.mainThread()).take(5).subscribe(new Action1() {            @Override            public void call(Long aLong) {                //ui 操作            }        });

timer

public static Observable timer(long delay, TimeUnit unit)public static Observable timer(long delay, TimeUnit unit, Scheduler scheduler)

这个用于延迟执行某些操作的方法,只会回调一次

 Worker worker = scheduler.createWorker();        child.add(worker);        worker.schedule(new Action0() {            @Override            public void call() {                try {                    child.onNext(0L);                } catch (Throwable t) {                    Exceptions.throwOrReport(t, child);                    return;                }                child.onCompleted();            }        }, time, unit);

这里返回0之后直接回调onCompletd了,也就是延迟执行任务就结束了

 Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {            @Override            public void call(Long aLong) {            //do something            }        });

这个回调也是发生在compution线程中的,所以也要通过Scheduler 去切换相应线程

range

public static Observable range(int start, int count)public static Observable range(int start, int count, Scheduler scheduler)

看下具体实现

 public static Observable range(int start, int count) {        ...        return Observable.create(new OnSubscribeRange(start, start + (count - 1)));    }void fastpath() {    final long end = this.end + 1L;    final Subscriber<? super Integer> o = this.o;    for (long i = index; i != end; i++) {          if (o.isUnsubscribed()) {              return;          }        o.onNext((int) i);      }      if (!o.isUnsubscribed()) {        o.onCompleted();       } }

实际就是从start开始计数,回调count次后终止

 Observable.range(3, 5).subscribe(new Action1() {            @Override            public void call(Integer integer) {                Log.e(TAG, "result= " + integer);            }        });

输出结果是

E/Main: result= 3E/Main: result= 4E/Main: result= 5E/Main: result= 6E/Main: result= 7

never

public static  Observable never()

这个返回的Observable对象,同时方法内不做任何处理,相当于直接跳过此次,不做处理

NeverObservable() {            super(new OnSubscribe() {                @Override                public void call(Subscriber<? super T> observer) {                    // do nothing                }            });        }

一般配合flatmap方法使用,具体使用场景并不多

举个例子,比如我们要打印一个数组里比3大的所有数,其他的我不想处理

 List array = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6));        Observable.from(array).flatMap(new Func1>() {            @Override            public Observable call(Integer integer) {                if (integer.intValue() <= 3) {                    return Observable.never();                }                return Observable.just(integer);            }        }).subscribe(new Action1() {            @Override            public void call(Integer integer) {                Log.e(TAG, "result= " + integer);            }        });

这里我们比3小的直接跳过,会继续后面的遍历而不影响整体流程,类比于我们的continue;输出结果是

E/Main: result= 4E/Main: result= 5E/Main: result= 6

empty

 public static  Observable empty()
private static final class EmptyHolder {        final static Observable INSTANCE = create(new OnSubscribe() {            @Override            public void call(Subscriber<? super Object> subscriber) {                subscriber.onCompleted();            }        });    }   

这个相当于直接跳出,相当于我们用的break,直接跳出回调完成的方法

比如下面的例子

 Observable.from(array).flatMap(new Func1>() {            @Override            public Observable call(Integer integer) {                if (integer > 3) {                   return Observable.empty();                }                return Observable.just(integer);            }        }).subscribe(new Action1() {            @Override            public void call(Integer integer) {                Log.e(TAG, "result= " + integer);            }        });

当发现第一个大于3的数的时候直接跳出循环,输出结果是

E/Main: result= 1E/Main: result= 2E/Main: result= 3

所以empty会影响整体流程,后续的发布事件全部都不会再接受,直接终止

error

public static  Observable error(Throwable exception)
private static class ThrowObservable extends Observable {        public ThrowObservable(final Throwable exception) {            super(new OnSubscribe() {                @Override                public void call(Subscriber<? super T> observer) {                    observer.onError(exception);                }            });        }    }

顾名思义,这个会直接抛出定义的异常,类似于我们平时定义的throw Exception,这个异常会直接到subscriber的onError里,同时结束整个流程(onCompleted和onError最终只会执行一个)

 Observable.from(array).flatMap(new Func1>() {            @Override            public Observable call(Integer integer) {                if (integer > 3) {                   return Observable.error(new RuntimeException("larger than 3"));                }                return Observable.just(integer);            }        }).subscribe(new Subscriber() {            @Override            public void onCompleted() {                Log.e(TAG,"onCompleted  ");            }            @Override            public void onError(Throwable e) {                Log.e(TAG,"onError  "+e);            }            @Override            public void onNext(Integer integer) {                Log.e(TAG,"onNext  "+integer);            }        });

输出结果是

E/Main: onNext  1E/Main: onNext  2E/Main: onNext  3E/Main: onError  java.lang.RuntimeException: larger than 3

defer

public static  Observable defer(Func0> observableFactory)
public final class OnSubscribeDefer implements OnSubscribe {    final Func0<? extends Observable<? extends T>> observableFactory;    public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {        this.observableFactory = observableFactory;    }    @Override    public void call(final Subscriber<? super T> s) {        Observable<? extends T> o;        try {            //开始创建新的Observable对象            //call 返回的就是上面的 Observable<? extends T>            o = observableFactory.call();        } catch (Throwable t) {            Exceptions.throwOrReport(t, s);            return;        }        //开始注册subscriber        o.unsafeSubscribe(Subscribers.wrap(s));    }    }public interface Func0 extends Function, Callable {    @Override    R call();} public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {        try {            //省略部分代码,类似代码输出是            //和上篇流程基本一致            ...            subscriber.onStart();            onSubscribe.call(subscriber);            return subscriber;        } catch (Throwable e) {              ...        }            return Subscriptions.unsubscribed();        }    }

我们知道只有当subscribe订阅后才会开始发布数据,Observable一般是开始就创建的;从源码可以看出来,defer这个在订阅之后才开始调用call方法创建Observable对象并开始绑定subscriber,类比的话就是我们常说的懒加载模式,使用方法也和之前的类似,只是把最初的创建挪到这个位置

Observable.defer(new Func0>() {            @Override            public Observable call() {                return Observable.just(1,2,3);            }        }).subscribe(new Action1() {            @Override            public void call(Integer integer) {                Log.e(TAG,"defRes  "+integer);            }});

输出结果是

E/Main: defRes  1E/Main: defRes  2E/Main: defRes  3

和前面的用法没什么差别

更多相关文章

  1. Android(安卓)Junit Test
  2. Android中怎么动态控制padding
  3. Android类参考---Fragment
  4. Handler:使用方式
  5. Android(安卓)纯代码加入点击效果
  6. Android(安卓)中TextView中跑马灯效果的实现方法
  7. Android开发之去掉title的三种方法
  8. [unity3d]Unity3D与android交互----构建android插件
  9. android画图-----DensityActivity 添加view

随机推荐

  1. 经历 | 寒冷的冬季,三本苦逼的找实习之路
  2. linux 复习
  3. 【TensorFlow】使用迁移学习训练自己的模
  4. 十分钟学会reqests模块爬取数据——从爬
  5. 购买并在自己的云服务器上配置anaconda
  6. GitHub上3k+star的python爬虫库你了解吗?
  7. Flutter | 什么是 Flutter?(送书)
  8. 怎么样描述你的数据——用python做描述性
  9. 手机上利用python进行数据分析——创建自
  10. python数据分析——在python中实现线性回