7. Observable的结合操作

2019/12/30 Rxjava2

# 简要:

需求了解:

在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等。这个时候我们就需要使用 RxJava 的结合操作符来完成这一需求,Rx中提供了丰富的结合操作处理的操作方法。

可用于组合多个Observables的操作方法:

  • CombineLatest:当Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据,然后发射这个函数的结果。
  • Join:只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
  • Merge:合并多个Observables的发射物,可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
  • Zip:通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体严格按照数量以及顺序发射单个数据项。
  • StartWith:在数据序列的开头插入一条指定的数据项或者数据序列。
  • SwitchOnNext:将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最新发射的Observable的数据项。

# 1. CombineLatest

当 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest 操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据。 CombineLatest 则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一 个函数结合它们最近发射的数据,然后发射这个函数的返回值。

img-CombineLatest 解析: combineLatest 操作符可以结合多个Observable,可以接收 2-9 个Observable对象, 在其中原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。此外combineLatest 操作符还有一些接收 Iterable , 数组方式的变体,以及其他指定参数combiner、bufferSize、和combineLatestDelayError方法等变体,在此就不在详细展开了,有兴趣的可以查看官方的相关API文档 (opens new window)了解。

实例代码:

	// Observables 创建
	Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
	Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
	Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
	
	// 1. combineLatest(ObservableSource, ObservableSource [支持2-9个参数]...,  BiFunction)
	// 结合多个Observable, 当他们其中任意一个发射了数据时,使用函数结合他们最近发射的一项数据
	Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
	
		@Override
		public String apply(Long t1, Long t2) throws Exception {
			System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
			if (t1 + t2 == 10) {
				return "Success";   // 满足一定条件,返回指定的字符串
			}
			return t1 + t2 + "";    // 计算所有数据的和并转换为字符串
		}
	}).subscribe(new Consumer<String>() {
	
		@Override
		public void accept(String t) throws Exception {
			System.out.println("----> accept combineLatest(1): " + t);
		}
	});
	
	System.out.println("--------------------------------------------------------");
	// 2. combineLatest(T1, T2, T3, Function)
	// Observables的结合
	Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
		@Override
		public String apply(Long t1, Long t2, Long t3) throws Exception {
			System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
			return t1 + t2 + t3 + "";   // 计算所有数据的和并转换为字符串
		}
	}).subscribe(new Consumer<String>() {
		@Override
		public void accept(String t) throws Exception {
			System.out.println("--> accept(2): " + t);
		}
	});

输出:

--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114

Javadoc: combineLatest(T1, T2, T3... , T9, combiner) (opens new window)

# 2. Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

img-join Join 操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。

解析: join(other, leftEnd, rightEnd, resultSelector) 相关参数的解析

  • other: 源Observable与其组合的目标Observable。
  • leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期。
  • rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期。
  • resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象。

注意: 这是源Observable和目标Observable发射数据在任意一个基于时间窗口的有效期内才会接收到组合数据,这就意味着可能有数据丢失的情况,在其中一个已经发射完所有数据,并且没有处于时间窗口的数据情况,另一个Observable的数据发射将不会收到组合数据。

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 1. join(other, leftEnd, rightEnd, resultSelector)
    // other: 目标组合的Observable
    // leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期
    // rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期
    // resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象
    sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            return "t1 = " + t1 + ", t2 = " + t2;                   // 对数据进行组合后返回和观察者
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(1): " + t);
        }
    });

    System.in.read();

输出:

-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14   // 此时源t1中已经没有数据还处于时间窗口有效期内

Javadoc: join(other, leftEnd, rightEnd, resultSelector) (opens new window)

groupJoin

groupJoin 操作符与 join 相同,只是参数传递有所区别。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector 可以将原始数据转换为 Observable 类型的数据发送给观察者。

img-groupJoin

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
    // groupJoin操作符与join相同,只是参数传递有所区别。
    // resultSelector可以将原始数据转换为Observable类型的数据发送给观察者。
    sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Observable<Long>, Observable<String>>() {
        @Override
        public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
            System.out.println("--> apply(2) combine: " + t1);            // 结合操作
            return t2.map(new Function<Long, String>() {
                @Override
                public String apply(Long t) throws Exception {
                    System.out.println("-----> apply(2) operation: " + t);
                    return "t1 = " + t1 + ", t2 = " + t;
                }
            });
        }
    }).subscribe(new Consumer<Observable<String>>() {
        @Override
        public void accept(Observable<String> stringObservable) throws Exception {
            stringObservable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
        }
    });

输出:

-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14

Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector) (opens new window)

# 3. Merge

合并多个Observables的发射物。

img-Merge

使用 Merge 操作符你可以将多个Observables的输出合并,就好像它们是一个单个的 Observable 一样。

# 3.1 merge

Merge 可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat 不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物),任何一个原始Observable的 onError 通知会被立即传递给观察者,而且会终止合并后的Observable。

img-merge

除了传递多个Observable给 merge ,你还可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable, merge 将合并它们的输出作为单个Observable的输出。

img-merge-observables

如果你传递一个发射Observables序列的Observable,你可以指定 merge 应该同时订阅的 Observable 的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了 onCompleted 通知。

示例代码:

    // 创建Observable对象
    Observable<Integer> odd = Observable.just(1, 3, 5);
    Observable<Integer> even = Observable.just(2, 4, 6);
    Observable<Integer> big = Observable.just(188888, 688888, 888888);

    // 创建list对象
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(odd);
    list.add(even);
    list.add(big);

    // 创建Array对象
    Observable<Integer>[] observables = new Observable[3];
    observables[0] = odd;
    observables[1] = even;
    observables[2] = big;

    // 创建发射Observable序列的Observable
    Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.just(1, 2));
            emitter.onNext(Observable.just(1, 2, 3));
            emitter.onNext(Observable.just(1, 2, 3, 4));
            emitter.onNext(Observable.just(1, 2, 3, 4, 5));
            emitter.onComplete();
        }
    });

    // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
    // 可接受 2-4 个Observable对象进行merge
    Observable.merge(odd, even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的列表List
    Observable.merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的数组Array
    Observable.mergeArray(observables)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
    // 可选参数, maxConcurrency: 最大的并发处理数
    // 接受一个发射Observable序列的Observable
    Observable.merge(sources)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 5. mergeWith(other)
    // merge 是静态方法, mergeWith 是对象方法: Observable.merge(odd,even) 等价于 odd.mergeWith(even)
    odd.mergeWith(even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(5): " + integer);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6

Javadoc: merge(source1, ... , source4) (opens new window)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize) (opens new window)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources) (opens new window)
Javadoc: merge(ObservableSource<ObservableSource> sources, int maxConcurrency) (opens new window)

# 3.2 mergeDelayError

如果传递给 merge 的任何一个的Observable发射了 onError通知终止了, merge 操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用 mergeDelayError

img-mergeDelayError

MergeDelayError 操作符,mergeDelayError 在合并与交错输出的使用上与 merge 相同,区别在于它会保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

由于MergeDelayError使用上和merge相同 ,所以这里就不做详细分析了,这里就简单描述其中的一种的使用实例。

实例代码:

    // 创建有Error的Observable序列的Observable
    Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {

        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.error(new Exception("Error Test1"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(2, 3));
            emitter.onNext(Observable.error(new Exception("Error Test2"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(4, 5, 6));
            emitter.onComplete();
        }
    });

    // 6. mergeDelayError
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.mergeDelayError(DelayErrorObservable)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(6)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(6): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(6): " + exceptions);
                    } else {
                        System.out.println("--> onError(6): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(6)");
                }
            });

输出:

--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]

Javadoc: mergeDelayError(source1, … , source4) (opens new window)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize) (opens new window)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources) (opens new window)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency) (opens new window)

# 4. Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个 结合体 发射单个数据项。

img-Zip

Zip 操作符与 Merge 类似,都是合并多个Observables的数据,返回一个Obversable,主要不同的是它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个Observable一样多的数据。

img-Zip-Sources

解析:

  1. Zip 操作符与 Merge 的使用上基本一致,主要不同的是 zip 发射的数据取决于发射数据项最少的那个Observable并且按照严格的顺序去结合数据。
  2. 同样具备静态方法 zip 与对象方法 zipWith,可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable。

使用上在此就不做详细的展开了,可参照上面的 Merge 使用方法,下面就针对 zip 的特性实现一个简单的实例。

实例代码:

    // 创建Observable
    Observable<Integer> observable1 = Observable.just(1, 2, 3);
    Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
    
    // zip(sources)
    // 可接受2-9个参数的Observable,对其进行顺序合并操作,最终合并的数据项取决于最少的数据项的Observable
    Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer t1, Integer t2) throws Exception {
            System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
            return t1 + t2 + "";
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept: " + s);  // 最终接受observable1全部数据项与observable2相同数量顺序部分数据
        }
    });

输出:

--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6

Javadoc: zip( source1, source2, ... , source9, zipper ) (opens new window)
Javadoc: zip( Iterable sources, Function zipper ) (opens new window)
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize) (opens new window)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources ) (opens new window)
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper ) (opens new window)

# 5. StartWith

在数据序列的开头插入一条指定的数据项或者数据序列。

img-StartWith 如果你想要一个Observable在发射数据之前先发射一个指定的数据或者数据序列(可以是单个数据、数组、列表,Observable中的数据),可以使 用 StartWith 操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用 Concat 操作符。)

img-StartWith-Items

实例代码:

    // 创建列表List
    List<Integer> lists = new ArrayList<>();
    lists.add(999);
    lists.add(9999);
    lists.add(99999);

    // 创建数组Array
    Integer[] arrays = new Integer[3];
    arrays[0] = 999;
    arrays[1] = 9999;
    arrays[2] = 9999;

    // 1. startWith(item)
    // 在Observable数据发射前发射item数据项
    Observable.just(1, 2, 3)
            .startWith(999)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 2. startWith(Iterable items)
    // 在Observable数据发射前发射items列表中的数据序列
    Observable.just(1, 2, 3)
            .startWith(lists)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 3. startWithArray(items)
    // 在Observable数据发射前发射items数组中的数据序列
    Observable.just(1, 2, 3)
            .startWithArray(arrays)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 4. startWith(ObservableSource other)
    // 在Observable数据发射前发射other中的数据序列
    Observable.just(1, 2, 3)
            .startWith(Observable.just(999, 9999, 99999))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

输出:

--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3

Javadoc: startWith(item) (opens new window)
Javadoc: startWith(Iterable items) (opens new window)
Javadoc: startWithArray(items) (opens new window)
Javadoc: startWith(ObservableSource other) (opens new window)

# 6. SwitchOnNext

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些 Observables最近发射的数据项。

# 6.1 switchOnNext

switchOnNext 订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个, switchOnNext 发射的这个新Observable并取消订阅前一个发射数据的旧Observable,开始发射最新的Observable发射的数据。

img-switchOnNext

注意: 当原始Observables发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在 后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射 的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

# 6.2 switchOnNextDelayError

Observables发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,如果Observables中的Observable有 Error 异常,将保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把 onError 传递给观察者。

img-switchOnNextDelayError

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

实例代码:

    // 创建Observable
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);

    // 创建发射Observable序列的Observable
    Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 创建发射含有Error通知的Observable序列的Observable
    Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
            emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
    // 可选参数 bufferSize: 缓存数据项大小
    // 接受一个发射Observable序列的Observable类型的sources,
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
    Observable.switchOnNext(sources)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("--------------------------------------------------------------------");
    // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
    // 可选参数 prefetch: 与读取数据项大小
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.switchOnNextDelayError(sourcesError)
            .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("--> onNext(2): " + t);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(2): " + exceptions);
                    } else {
                        System.out.println("--> onError(2): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]

Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize) (opens new window)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch) (opens new window)

# 小结

Rxjava 的合并操作符能够同时处理多个被观察者,并发送相应的事件通知以及数据。常常应用于多业务合并处理场景,比如表单的联动验证,网络交互性数据的校验等,rxjava的合并操作符能够很好的去实现和处理。

提示:以上使用的Rxjava2版本: 2.2.12 (opens new window)

实例代码: