11. Observable的条件操作符

2019/12/30 Rxjava2

# 简要:

需求了解:

在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运, Rxjava 中已经有了很多条件操作符,一起来了解一下吧。

下面列出了一些Rxjava的用于条件操作符:

  • Amb:给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。
  • DefaultIfEmpty:发射来自原始Observable的值,如果原始 Observable 没有发射任何数据项,就发射一个默认值。
  • SwitchIfEmpty:如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。
  • SkipUntil:丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。
  • SkipWhile:丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。
  • TakeUntil:发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

# 1. Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

img-Amb

解析: 对多个Observable进行监听,首先发射通知(包括数据)的Observable将会被观察者观察,发射这个Observable的所有数据。

示例代码:

    // 创建Observable
   Observable<Integer> delayObservable = Observable.range(1, 5)
                                            										.delay(100, TimeUnit.MILLISECONDS); // 延迟100毫秒发射数据
    Observable<Integer> rangeObservable = Observable.range(6, 5);

    // 创建Observable的集合
    ArrayList<Observable<Integer>> list = new ArrayList<>();
    list.add(delayObservable);
    list.add(rangeObservable);

    // 创建Observable的数组
    Observable<Integer>[] array = new Observable[2];
    array[0] = delayObservable;
    array[1] = rangeObservable;

    /**
     *  1. ambWith(ObservableSource<? extends T> other)
     *  与另外一个Observable比较,只发射首先发射通知的Observable的数据
     */
    rangeObservable.ambWith(delayObservable)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  2. amb(Iterable<? extends ObservableSource<? extends T>> sources)
     *  接受一个Observable类型的集合, 只发射集合中首先发射通知的Observable的数据
     */
    Observable.amb(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  3. ambArray(ObservableSource<? extends T>... sources)
     *  接受一个Observable类型的数组, 只发射数组中首先发射通知的Observable的数据
     */
    Observable.ambArray(array)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.in.read();

输出:

--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
------------------------------------------------
--> accept(2): 6
--> accept(2): 7
--> accept(2): 8
--> accept(2): 9
--> accept(2): 10
------------------------------------------------
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10

Javadoc: ambWith(ObservableSource other) (opens new window)
Javadoc: amb(Iterable sources) (opens new window)
Javadoc: ambArray(ObservableSource... sources) (opens new window)

# 2. DefaultIfEmpty

发射来自原始Observable的值,如果原始 Observable 没有发射数据项,就发射一个默认值。 img-DefaultIfEmpty

解析: DefaultIfEmpty 简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就发射一个你提供的默认值。如果你需要发射更多的数据,或者切换备用的Observable,你可以考虑使用 switchIfEmpty 操作符 。

示例代码:

    /**
     *   defaultIfEmpty(@NotNull T defaultItem)
     *  如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式),
     *  DefaultIfEmpty 返回的Observable就发射一个你提供的默认值defaultItem。
     */
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onComplete();   // 不发射任何数据,直接发射完成通知
        }
    }).defaultIfEmpty("No Data emitter!!!")
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("--> onNext: " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

输出:

--> onSubscribe
--> onNext: No Data emitter!!!
--> onComplete

Javadoc: defaultIfEmpty(T defaultItem) (opens new window)

# 3. SwitchIfEmpty

如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

img-SwitchIfEmpty

解析: 如果原始 Observable 没有发射数据时,发射切换指定的 other 继续发射数据。

示例代码:

    /**
     *  switchIfEmpty(ObservableSource other)
     *  如果原始Observable没有发射数据时,发射切换指定的other继续发射数据
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();           // 不发射任何数据,直接发射完成通知
        }
    }).switchIfEmpty(Observable.just(888))  // 如果原始Observable没有发射数据项,默认发射备用的Observable,发射数据项888
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext: " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

输出:

--> onSubscribe
--> onNext: 888
--> onComplete

Javadoc: switchIfEmpty(ObservableSource other) (opens new window)

# 4. SkipUntil

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。

img-SkipUntil 示例代码:

    /**
     *  skipUntil(ObservableSource other)
     *  丢弃原始Observable发射的数据,直到other发射了一个数据,然后发射原始Observable的剩余数据。
     */
    Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
            // 丢弃2000毫秒的原始Observable发射的数据,接受后面的剩余部分数据
            .skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext: " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

    System.in.read();

输出:

--> onSubscribe
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipUntil(ObservableSource other) (opens new window)

# 5. SkipWhile

丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

img-SkipWhile

示例代码:

        /**
         *  skipWhile(Predicate<? super T> predicate)
         *  丢弃原始 Observable 发射的数据,直到函数predicate的条件为假,然后发射原始Observable剩余的数据。
         */
        Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
                .skipWhile(new Predicate<Long>() {
                    @Override
                    public boolean test(Long aLong) throws Exception {
                        if (aLong > 5) {
                            return false;       // 当原始数据大于5时,发射后面的剩余部分数据
                        }
                        return true;            // 丢弃原始数据项
                    }
                }).subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("--> onNext: " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete");
                    }
                });

        System.in.read();

输出:

--> onSubscribe
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipWhile(Predicate predicate) (opens new window)

# 6. TakeUntil

发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

img-takeUntil

# 6.1 takeUntil(ObservableSource other)

TakeUntil 订阅并开始发射原始 Observable,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,TakeUntil 返回的 Observable 会停止发射原始 Observable 并终止。

img-takeUntil-other

解析: 第二个Observable发射一项数据或一个 onError 通知或一个 onCompleted 通知都会导致 takeUntil 停止发射数据。

示例代码:

    // 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据
    Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

    /**
     *  1. takeUntil(ObservableSource other)
     *  发射来自原始Observable的数据,直到other发射了一个数据或一个通知后停止发射原始Observable并终止。
     */
    observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止发射原始数据
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(1): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeUntil(ObservableSource other) (opens new window)

# 6.2 takeUntil(Predicate stopPredicate)

每次发射数据后,通过一个谓词函数来判定是否需要终止发射数据。

img-takeUntil-predicate

解析: 每次发射数据后,通过一个谓词函数 stopPredicate 来判定是否需要终止发射数据,如果 stopPredicate 返回 true 怎表示停止发射原始Observable后面的数据,否则继续发射后面的数据。

示例代码:

    /**
     *  2. takeUntil(Predicate<? super T> stopPredicate)
     *  每次发射数据后,通过一个谓词函数stopPredicate来判定是否需要终止发射数据
     *  如果stopPredicate返回true怎表示停止发射后面的数据,否则继续发射后面的数据
     */
    observable.takeUntil(new Predicate<Long>() {
        @Override
        public boolean test(Long aLong) throws Exception {  // 函数返回false则为继续发射原始数据,true则停止发射原始数据
            if(aLong > 5){
                return true;      // 满足条件后,停止发射数据
            }
            return false;         // 继续发射数据
        }
    }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Long aLong) {
            System.out.println("--> onNext(2): " + aLong);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    System.in.read();

输出:

--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onNext(2): 3
--> onNext(2): 4
--> onNext(2): 5
--> onNext(2): 6
--> onComplete(2)

Javadoc: takeUntil(Predicate stopPredicate) (opens new window)

# 7. TakeWhile

发射原始Observable的数据,直到一个特定的条件,然后跳过剩余的数据。

img-TakeWhile

解析: 发射原始 Observable 的数据,直到 predicate 的条件为 false ,然后跳过剩余的数据。

示例代码:

        // 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据
        Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

        /**
         *  takeWhile(Predicate predicate)
         *  发射原始Observable的数据,直到predicate的条件为false,然后跳过剩余的数据
         */
        observable.takeWhile(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) throws Exception {  // 函数返回值决定是否继续发射后续的数据
                if(aLong > 5){
                    return false;        // 满足条件后跳过后面的数据
                }
                return true;             // 继续发射数据
            }
        }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("--> onNext: " + aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
        });

        System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeWhile(Predicate predicate) (opens new window)

# 小结

本节主要介绍了Rxjava条件操作符可以根据不同的条件进行数据的发射,变换等相关行为。

提示:以上使用的Rxjava2版本: 2.2.12 (opens new window)

实例代码: