8. ConnectableObservable 连接的操作

2019/12/30 Rxjava2

# 简要:

需求了解:

Rxjava中的普通的 Observable 在观察者订阅的时候就会发射数据,但是有的时候我们想自己控制数据的发射,比如在有指定的观察者或者全部的观察者订阅后开始发射数据,这个时候我们就要要用到Rxjava中的可连接的Observable来完成这个需求。

这一节主要介绍 ConnectableObservable 和它的子类以及它们的操作符:

  • ConnectableObservable: 一个可连接的Observable,在订阅后不发射数据,调用 connect() 方法后开始发射数据。
  • Observable.publish():将一个Observable转换为一个可连接的Observable 。
  • ConnectableObservable.connect():指示一个可连接的Observable开始发射数据。
  • Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
  • ConnectableObservable.refCount():让一个可连接的Observable表现得像一个普通的Observable。
  • Observable.share():可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,等价于Observable.publish().refCount()
  • Observable.replay():保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

# 1. ConnectableObservable

一个可连接的Observable(ConnectableObservable)与普通的Observable差不多。不同之处:可连接的Observable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始。用这种方法,你可以等部分或者所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

img-ConnectableObservable 注意: ConnectableObservable 的线程切换只能通过 replay 操作符来实现,普通 Observable 的 subscribeOn()observerOn() 在 ConnectableObservable 中不起作用。可以通过 replay 操作符的指定线程调度器的方式来进行线程的切换。

Javadoc: ConnectableObservable (opens new window)

# 2. Publish

将普通的Observable转换为可连接的Observable(ConnectableObservable)。

如果要使用可连接的Observable,可以使用Observable的 publish 操作符,来将相应转换为ConnectableObservable对象。

有一个变体接受一个函数作为参数(publish(Function selector))。这个函数用原始Observable发射的数据作为参数,产生 一个新的数据作为 ConnectableObservable 给发射,替换原位置的数据项。实质是在签名的基础上添加一个 Map 操作。

简单实例:

  // 1. publish()
  // 创建ConnectableObservable
  ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
          .publish();    // publish操作将Observable转化为一个可连接的Observable

	// 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
  // 接受原始Observable的数据,产生一个新的Observable,可以对这个Observable进行函数处理
  Observable<String> publish = Observable.range(1, 5)
          .publish(new Function<Observable<Integer>, ObservableSource<String>>() {

              @Override
              public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
                  System.out.println("--> apply(4): " + integerObservable.toString());

                  Observable<String> map = integerObservable.map(new Function<Integer, String>() {

                      @Override
                      public String apply(Integer integer) throws Exception {
                          return "[this is map value]: " + integer * integer;
                      }
                  });
                  return map;
              }
          });
          
    publish.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(4): " + s);
        }
    });

输出:

--> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649
--> accept(4): [this is map value]: 1
--> accept(4): [this is map value]: 4
--> accept(4): [this is map value]: 9
--> accept(4): [this is map value]: 16
--> accept(4): [this is map value]: 25

Javadoc: Observable.publish() (opens new window)
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector) (opens new window)

# 3. Connect

让一个可连接的Observable开始发射数据给订阅者。

  • 可连接的Observable (connectableObservable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了 Connect 操作符时才会开始。
  • RxJava中 connect 是 ConnectableObservable 接口的一个方法,使用 publish 操作符可以将一个普通的Observable转换为一个 ConnectableObservable 。
  • 调用 ConnectableObservable 的 connect 方法会让它后面的Observable开始给发射数据给订阅 者。connect 方法返回一个 Subscription 对象,可以调用它的 unsubscribe 方法让Observable停 止发射数据给观察者。
  • 即使没有任何订阅者订阅它,你也可以使用 connect 方法让一个Observable开始发射数据 (或者开始生成待发射的数据)。这样,你可以将一个"冷"的Observable变为"热"的。

实例代码:

    // 1. publish()
    // 创建ConnectableObservable
    ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
            .publish();    // publish操作将Observable转化为一个可连接的Observable

    // 创建普通的Observable
    Observable<Integer> range = Observable.range(1, 5);

    // 1.1 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(1)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(1): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1)");
        }
    });

    // 1.2 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    // 1.3 普通Observable在被订阅时就会发射数据
    range.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(3)");
        }
    });

    System.out.println("----------------start connect------------------");
    // 可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始发射数据
    // connectableObservable.connect();
    
    // 可选参数Consumer,返回一个Disposable对象,可以获取订阅状态和取消当前的订阅
    connectableObservable.connect(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> connect accept: " + disposable.isDisposed());
            // disposable.dispose();
        }
    });

输出:

--> onSubscribe(1)
--> onSubscribe(2)
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onComplete(3)
----------------start connect------------------
--> connect accept: false
--> onNext(1): 1
--> onNext(2): 1
--> onNext(1): 2
--> onNext(2): 2
--> onNext(1): 3
--> onNext(2): 3
--> onNext(1): 4
--> onNext(2): 4
--> onNext(1): 5
--> onNext(2): 5
--> onComplete(1)
--> onComplete(2)

Javadoc: ConnectableObservable.connect() (opens new window)
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection) (opens new window)

# 4. RefCount

RefCount 的作用是让一个可连接的Observable行为像普通的Observable。

RefCount 操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable 时, RefCount 连接到下层的可连接Observable。 RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

解析: refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。

实例代码:

    /**
     * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
     *
     * 具有以下可选参数:
     * subscriberCount: 指定需要连接到上游的订阅者数量。注意:当订阅者满足此数量后才会处理
     * timeout:         所有订阅用户退订后断开连接前的等待时间
     * unit:            时间单位
     * scheduler:        断开连接之前要等待的目标调度器
     */
    Observable<Long> refCountObservable = Observable
            .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .publish()
            .refCount()
            .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
            .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程
        //  .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());

    // 第1个订阅者
    refCountObservable.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除当前的订阅
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 第2个订阅者
    refCountObservable.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(2, TimeUnit.SECONDS)   // 延迟2秒后订阅
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

输出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler) (opens new window)

# 5. Share

一个普通的Observable可以通过 publish 来将其转换为ConnectableObservable,然后可以调用其 refCount() 的方法将其转换为一个具有 ConnectableObservable 特性的Observable。

其实Observable中还有一个操作方法,可以直接完成此步骤的操作,这就是 Observable.share() 操作符。

可以来看一下share操作符的源码:

    public final Observable<T> share() {
        return publish().refCount();
    }

通过源码可以知道,share() 方法可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,即Observable.publish().refCount() == Observable.share()

实例代码:

    // share()
    // 通过share() 同时应用 publish 和 refCount 操作
    Observable<Long> share = Observable
            .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
      //    .publish().refCount()
            .share()  // 等价于上面的操作
            .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
            .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程

    // 1. 第一个订阅者
    share.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除当前的订阅
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 2. 第二个订阅者
    share.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(1, TimeUnit.SECONDS)    // 延迟1秒后订阅
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

输出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: Observable.share() (opens new window)

# 6. Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

img-Replay

如果在将一个Observable转换为可连接的Observable之前对它使用 Replay 操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,可以缓存发射过的数据,即使那些观察者在这 个Observable开始给其它观察者发射数据之后才订阅。

注意: replay操作符生成的 connectableObservable ,如果没有对缓存进行限定,那么无论观察者何时去订阅,都可以收到 Observable 完整的数据序列项。

replay 操作符最好根据实际情况限定缓存的大小,否则数据发射过快或者较多时会占用很高的内存。replay 操作符有可以接受不同参数的变体,有的可以指定 replay 的最大缓存数量或者指定缓存时间,还可以指定调度器。

  • replay不仅可以缓存Observable的所有数据序列,也可以进行限定缓存大小的操作。
  • 还有有一种 replay 返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是 replay 变换之后的数据项。

实例代码:

    // 创建发射数据的Observable
    Observable<Long> observable = Observable
            .intervalRange(1,
                    10,
                    1,
                    500,
                    TimeUnit.MILLISECONDS,
                    Schedulers.newThread());

    /**
     * 1.1 replay(Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 接受原始数据的所有数据
     */
//  ConnectableObservable<Long> replay1 = observable.replay();

    /**
     * 1.2 replay(int bufferSize, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 只缓存 bufferSize 个最近的原始数据
     */
//  ConnectableObservable<Long> replay1 = observable.replay(1); // 设置缓存大小为1, 从原数据中缓存最近的1个数据

    /**
     * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 在订阅前指定的时间段内缓存 bufferSize 个数据, 注意计时开始是原始数据发射第1个数据项之后开始
     */
//  ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);

    /**
     * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 在订阅前指定的时间段内缓存数据, 注意计时开始是原始数据发射第1个数据项之后开始
     */
   ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);

    // 进行 connect 操作
    replay1.connect();

    // 第一个观察者
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-1)");
        }
    }).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println("--> accept(1-1): " + aLong);
        }
    });

    // 第二个观察者(延迟1秒后订阅)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-2)");
        }
    }).delaySubscription(1, TimeUnit.SECONDS)
      .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("--> accept(1-2): " + aLong);
            }
      });

    // 第三个观察者(延迟2秒后订阅)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-3)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
       .subscribe(new Consumer<Long>() {
           @Override
           public void accept(Long aLong) throws Exception {
               System.out.println("--> accept(1-3): " + aLong);
           }
       });

    System.in.read();
    System.out.println("----------------------------------------------------------");
    /**
     * 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
     * int bufferSize,             				 	可选参数: 指定从元数据序列数据的缓存大小
     * long time, TimeUnit unit,    	可选参数: 指定缓存指定时间段的数据序列
     * Scheduler scheduler)         		可选参数: 指定线程调度器
     *
     * 接受一个变换函数 function 为参数,这个函数接受原始Observable发射的数据项为参数
     * 通过指定的函数处理后,返回一个处理后的Observable
     */
    Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
            // 对原始数据进行处理
            Observable<String> map = longObservable.map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Exception {
                    return aLong + "² = " + aLong * aLong;  // 将原始数据进行平方处理,并转换为字符串数据类型
                }
            });

            return map;
        }
    }, 1, Schedulers.newThread());

    replayObservable.subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread());

    // 第一个观察者
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-1)");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(2-1): " + s);
        }
    });

    // 订阅第二个观察者 (延迟2秒后订阅)
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-2)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
      .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(2-2): " + s);
            }
       });

    System.in.read();

输出:

----> onSubScribe(1-1)
--> accept(1-1): 1
--> accept(1-1): 2
--> accept(1-1): 3
----> onSubScribe(1-2)
--> accept(1-2): 2
--> accept(1-2): 3
--> accept(1-1): 4
--> accept(1-2): 4
--> accept(1-1): 5
--> accept(1-2): 5
----> onSubScribe(1-3)
--> accept(1-3): 4
--> accept(1-3): 5
--> accept(1-1): 6
--> accept(1-2): 6
--> accept(1-3): 6
--> accept(1-1): 7
--> accept(1-2): 7
--> accept(1-3): 7
--> accept(1-1): 8
--> accept(1-2): 8
--> accept(1-3): 8
--> accept(1-1): 9
--> accept(1-2): 9
--> accept(1-3): 9
--> accept(1-1): 10
--> accept(1-2): 10
--> accept(1-3): 10
----------------------------------------------------------
--> onSubScribe(2-1)
--> accept(2-1): 1² = 1
--> accept(2-1): 2² = 4
--> accept(2-1): 3² = 9
--> accept(2-1): 4² = 16
--> onSubScribe(2-2)
--> accept(2-1): 5² = 25
--> accept(2-2): 1² = 1
--> accept(2-2): 2² = 4
--> accept(2-1): 6² = 36
--> accept(2-2): 3² = 9
--> accept(2-1): 7² = 49
--> accept(2-1): 8² = 64
--> accept(2-2): 4² = 16
--> accept(2-2): 5² = 25
--> accept(2-1): 9² = 81
--> accept(2-2): 6² = 36
--> accept(2-1): 10² = 100
--> accept(2-2): 7² = 49
--> accept(2-2): 8² = 64
--> accept(2-2): 9² = 81
--> accept(2-2): 10² = 100

Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) (opens new window)
Javadoc: Observable.replay(Function<Observable<T>,ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) (opens new window)

# 小结

Rxjava 的连接操作符主要的核心是 ConnectableObservable 这个可连接的Observable对象的概念。可连接的 Observable 在被订阅时并不会直接发射数据,只有在他的 connect() 方法被调用时才会发射数据。便于更好的对数据的发射行为的控制,同时也对数据有很好的操作能力,可以缓存数据,指定缓存大小,时间片段缓存等。

提示:以上使用的Rxjava2版本: 2.2.12 (opens new window)

实例代码: