下面展示了可用于多个Observables的各种操作符
- startWith()---在数据序列的开头增加一项数据
- merge()---将多个Observable合并为一个
- mergeDelayError()---合并多个Observables,让没有错误的Observable都完成后再发射错误通知
- zip()---使用一个函数组合多个Observable发射的数据集合,然后在发射这个结果
- and(),then(),and when()---(rxjava-joins)通过模式和计划组合多个Observables发射的数据集合
- combineLatest()---当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。
- join() and groupJoin()---无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射。
- switchOnNext()---将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据。
###StartWith 在数据序列的开头插入一条指定的项
/** * startWith操作符 解释:在源Observable输出之前插入指定数据项 */ private static void test() { Observable.just(7, 8, 9).startWith(11, 12) .subscribe(new Observer() { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onError(Throwable e) { System.out.println("onError " + e.toString()); } @Override public void onNext(Integer t) { System.out.println("onNext t = " + t); } }); }复制代码
###Merge操作符 合并多个Observables的发射物
Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
正如图例上展示的,任何一个原始Observable的onError通知会被立即传递给观察者,而且会终止合并后的Observable。
在很多ReactiveX实现中还有一个叫MergeDelayError的操作符,它的行为有一点不同,它会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。
RxJava将它实现为merge, mergeWith和mergeDelayError。
/** * Merge 解释:将2-9个Observables合并到一个Observable中进行发射,合并后的数据可能会是交错(无序)的(如果想要没有交错, * 可以使用concat操作符) merge还可以传递一个Observable列表List,数组 * 甚至是一个发射Observable序列的Observable,merge将合并它们的输出作为单个Observable的输出 */ private static void test1() { ObservableletterObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H"); Observable numberObservable = Observable.just(1, 2, 3, 4, 5); Observable.merge(letterObservable, numberObservable) .subscribe(new Action1 () { @Override public void call(Serializable value) { System.out.println("value = " + value); } }); }复制代码
###Zip操作符 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
RxJava将这个操作符实现为zip和zipWith。
zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。 ###zipWith
zip和zipWith默认不在任何特定的操作符上执行。
/** * Zip * 结合两个或多个Observables发射的数据项,每个数据只能组合一次,而且都是有序的。 * 它只发射与发射数据项最少的那个Observable一样多的数据。 * * 应用场景参考:http://blog.csdn.net/jdsjlzx/article/details/51724087 */ private static void test2() { ObservableletterObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H"); Observable numberObservable = Observable.just(1, 2, 3, 4, 5); Observable.zip(letterObservable, numberObservable, new Func2 () { @Override public String call(String t1, Integer t2) { return t1 + t2; } }).subscribe(new Action1 () { @Override public void call(String value) { System.out.println("value = " + value); } }); }复制代码
###CombineLatest操作符 当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
/** * CombineLatest * 解释:combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。 * 这两个Observable中任意一个Observable产生的结果,都和另一个Observable最后产生的结果,按照一定的规则进行合并。 */ private static void test3() { ObservableletterObservable = Observable.just("A", "B", "C"); Observable numberObservable = Observable.just(4, 5); //letterObservable numberObservable谁在前谁在后都会对执行结果会有影响 Observable.combineLatest( numberObservable, letterObservable,new Func2 () { @Override public String call(Integer t1, String t2) { System.out.println("combine t1 = " + t1 + " | t2 = " + t2);// t1的值一开始就是letterObservable中的最后一个值 return t1 + t2; } }).subscribe(new Action1 () { @Override public void call(String value) { System.out.println("value = " + value); } }); }复制代码
###withLatestFrom操作符
###Join操作符 任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
/** * Join 类似于combineLatest操作符,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内, * 可以与另一个Observable产 生的结果按照一定的规则进行合并 * * Join(Observable,Func1,Func1,Func2) 需要传递四个参数 * * join操作符的用法如下: observableA.join(observableB, observableA产生结果生命周期控制函数, * observableB产生结果生命周期控制函数, observableA产生的结果与observableB产生的结果的合并规则) * * 一句话概括:在observableA的生命周期内:observableB输出的数据项与observableA输出的数据项每个合并 * * test4() 没有任何合并结果输出 分析:同一线程:observableA的生命周期已经执行完了,observableB还没出来,所以合并不了 * */ private static void test4() { ObservableobservableA = Observable.range(1, 5); List data = Arrays.asList(6, 7, 8, 9, 10); Observable observableB = Observable.from(data); observableA.join(observableB, new Func1 >() { @Override public Observable call(Integer value) { //return Observable.just(value); return Observable.just(value).delay(1, TimeUnit.SECONDS); } }, new Func1 >() { @Override public Observable call(Integer value) { return Observable.just(value); } }, new Func2 () { @Override public Integer call(Integer value1, Integer value2) { System.out.println("left: " + value1 + " right:" + value2); return value1 + value2; } }).subscribe(new Observer () { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext value = " + value); } }); }复制代码