4. Observable的数据变换(二)

2019/12/30 Rxjava2

# 1. Window

定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据。

img-window WindowBuffer 类似,但不是发射来自原始Observable的数据包,它发射的是 Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发 射一个 onCompleted 通知。

Buffer一样,Window 有很多变体,每一种都以自己的方式将原始Observable分解为多个作为结果的Observable,每一个都包含一个映射原始数据的 window 。用 Window操作符的术语描述就是,当一个窗口打开(when a window "opens")意味着一个新的Observable已经发射 (产生)了,而且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭 (when a window "closes")意味着发射(产生)的Observable停止发射原始Observable的数据, 并且发射终止通知 onCompleted 给它的观察者们。

在RxJava中有许多种Window操作符的方法。

# 1.1 window(closingSelector)

window 的这个方法会立即打开它的第一个窗口。每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并立即打开一个新窗口。用这个方法,这种 window 变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。

img-window(closingSelector) 解析: 一开始开启一个 window 接收原始数据,每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并取消此时订阅closingSelector 的Observable ( 此时可能是没有数据 window )并立即打开一个新窗口,注意: 每个窗口开启前都会去订阅一个closingSelector返回的 Observable。

实例代码:

	// 1. window(Callable boundary)
	// 开启一个window,并订阅观察boundary返回的Observable发射了一个数据,
	// 则关闭此window,将收集的数据以Observable发送, 重新订阅boundary返回的Observable,开启新window
	Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
		.window(new Callable<Observable<Long>>() {

			@Override
			public Observable<Long> call() throws Exception {
				System.out.println("--> call(1)");
				return Observable.timer(2, TimeUnit.SECONDS); // 两秒后关闭当前窗口
			}
		}).subscribe(new Consumer<Observable<Long>>() {

			@Override
			public void accept(Observable<Long> t) throws Exception {
				// 接受每个window接受的数据的Observable
				t.subscribe(new Consumer<Long>() {

					@Override
					public void accept(Long t) throws Exception {
						System.out.println("--> accept(1): " + t);
					}
				});
			}
		});

输出:

--> call(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> call(1)
--> accept(1): 4
--> accept(1): 5
--> call(1)
--> accept(1): 6
--> accept(1): 7
--> call(1)
--> accept(1): 8
--> accept(1): 9
--> call(1)
--> accept(1): 10

Javadoc: window(closingSelector) (opens new window)
Javadoc: window(closingSelector, bufferSize) (opens new window)

# 1.2 window(openingIndicator, closingIndicator)

openingIndicator 发射一个数据,就会打开一个 window, 同时订阅 closingIndicator 返回的Observable,当这个Observable发射一个数据,就结束此 window 和 ,发送收集数据的 Observable。

img-window(openingIndicator,  closingIndicator) 无论何时,只要 window 观察到 windowOpenings 这个Observable发射了一个 Opening 对象,它就打开一个窗口,并且同时调用 closingSelector 生成一个与那个窗口关联的关闭 (closing)Observable 。当这个关闭 (closing)Observable 发射了一个对象时,window 操作符就会关闭那个窗口以及关联的closingSelector的 Observable。

注意: 对这个方法来说,由于当前窗口的关闭和新窗口的打开是由单独的 Observable 管理的,它创建的窗口可能会存在重叠(重复某些来自原始Observable的数据) 或间隙(丢弃某些来自原始Observable的数据)

实例代码:

	// 2. window(ObservableSource openingIndicator, Function<T, ObservableSource<R>> closingIndicator)
	// 当openingIndicator发射一个数据,就会打开一个window, 同时订阅closingIndicator返回的Observable,
	// 当这个Observable发射一个数据,就结束此window以及对应的closingIndicator,发送收集数据的 Observable。
	Observable<Long> openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
			.doOnSubscribe(new Consumer<Disposable>() {

				@Override
				public void accept(Disposable t) throws Exception {
					System.out.println("--> openingIndicator is subscribe!");
				}
			}).doOnComplete(new Action() {
				
				@Override
				public void run() throws Exception {
					System.out.println("--> openingIndicator is completed!");
				}
			}).doOnNext(new Consumer<Long>() {

				@Override
				public void accept(Long t) throws Exception {
					System.out.println("--> openingIndicator emitter: " + t);
				}
			});
	
	Observable<Long> dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
			.doOnSubscribe(new Consumer<Disposable>() {

				@Override
				public void accept(Disposable t) throws Exception {
					System.out.println("--> DataSource is subscribe!");
				}
			}).doOnNext(new Consumer<Long>() {

				@Override
				public void accept(Long t) throws Exception {
					System.out.println("--> DataSource emitter: " + t);
				}
			});
	
	dataSource.window(openingIndicator, new Function<Long, Observable<Long>>() {

				@Override
				public Observable<Long> apply(Long t) throws Exception {
					System.out.println("--> apply(2): " + t);
					return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer<Disposable>() {

						@Override
						public void accept(Disposable t) throws Exception {
							System.out.println("--> closingIndicator is subscribe!");
						}
					});
				}
			}).subscribe(new Consumer<Observable<Long>>() {

				@Override
				public void accept(Observable<Long> t) throws Exception {
					System.out.println("-------------------> new window data");
					t.subscribe(new Consumer<Long>() {

						@Override
						public void accept(Long t) throws Exception {
							System.out.println("--> accept(2): " + t);
						}
					});
				}
			});

输出:

--> DataSource is subscribe!
--> openingIndicator is subscribe!
--> openingIndicator emitter: 1
--> DataSource emitter: 1
-------------------> new window data
--> apply(2): 1
--> closingIndicator is subscribe!
--> openingIndicator emitter: 2
--> DataSource emitter: 2
-------------------> new window data
--> apply(2): 2
--> closingIndicator is subscribe!
--> accept(2): 2
--> accept(2): 2
--> openingIndicator emitter: 3
--> DataSource emitter: 3
-------------------> new window data
--> apply(2): 3
--> closingIndicator is subscribe!
--> accept(2): 3
--> accept(2): 3
--> accept(2): 3
--> DataSource emitter: 4
--> openingIndicator emitter: 4
--> accept(2): 4
--> accept(2): 4
-------------------> new window data
--> apply(2): 4
--> closingIndicator is subscribe!
--> DataSource emitter: 5
--> accept(2): 5
--> accept(2): 5
--> openingIndicator emitter: 5

Javadoc: window(openingIndicator, closingIndicator) (opens new window)
Javadoc: window(openingIndicator, closingIndicator,bufferSize) (opens new window)

# 1.3 window(count)

这个 window 的方法立即打开它的第一个窗口。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了 onErroronCompleted 通知它也会关闭当前窗口。

这种 window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始 Observable发射的数据是 一一对应 的。

img-window(count)

实例代码:

	// 3. window(count)
	// 以count为缓存大小收集的不重叠的Observables对象,接受的数据与原数据彼此对应
	Observable.range(1, 20)
		.window(5)	// 设置缓存大小为5
		.subscribe(new Consumer<Observable<Integer>>() {

			@Override
			public void accept(Observable<Integer> t) throws Exception {
				System.out.println("--------------> new data window");
				t.subscribe(new Consumer<Integer>() {
					@Override
					public void accept(Integer t) throws Exception {
						System.out.println("--> accept window(3): " + t);
					}
				});
			}
		});

输出:

--------------> new data window
--> accept window(3): 1
--> accept window(3): 2
--> accept window(3): 3
--> accept window(3): 4
--> accept window(3): 5
--------------> new data window
--> accept window(3): 6
--> accept window(3): 7
--> accept window(3): 8
--> accept window(3): 9
--> accept window(3): 10
--------------> new data window
--> accept window(3): 11
--> accept window(3): 12
--> accept window(3): 13
--> accept window(3): 14
--> accept window(3): 15
--------------> new data window
--> accept window(3): 16
--> accept window(3): 17
--> accept window(3): 18
--> accept window(3): 19
--> accept window(3): 20

Javadoc: window(count) (opens new window)

# 1.4 window(count, skip)

这个 window 的方法立即打开它的第一个窗口。原始Observable每发射 skip 项数据它就打开 一个新窗口(例如,如果 skip 等于3,每到第三项数据,它会创建一个新窗口)。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable 收到了onErroronCompleted 通知它也会关闭当前窗口。

img-window(count,	skip)

解析: window 一开始打开一个 window,每发射 skip 项数据就会打开一个 window 独立收集 原始数据,当 window 收集了 count 个数据就会关闭,开启另外一个。当原始Observable发送了onError或者onCompleted通知也会关闭当前窗口。

  • skip = count: 会依次顺序接受原始数据,同window(count)
  • skip > count: 两个窗口可能会有 skip-count 项数据丢失
  • skip < count: 两个窗口可能会有 count-skip 项数据重叠

实例代码:

	// 4. window(count,skip)
	// window一开始打开一个window,每发射skip项数据就会打开一个window独立收集原始数据
	// 当window收集了count个数据就会关闭window,开启另外一个。
	// 当原始Observable发送了onError 或者 onCompleted 通知也会关闭当前窗口。
	// 4.1 skip = count: 会依次顺序接受原始数据,同window(count)
	Observable.range(1, 10)
		.window(2, 2)	// skip = count, 数据会依次顺序输出
		.subscribe(new Consumer<Observable<Integer>>() {

			@Override
			public void accept(Observable<Integer> t) throws Exception {
				
				t.observeOn(Schedulers.newThread())
					.subscribe(new Consumer<Integer>() {
	
						@Override
						public void accept(Integer t) throws Exception {
							System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId());
						}
					});
			}
		});
	
	// 4.2 skip > count: 两个窗口可能会有 skip-count 项数据丢失
	Observable.range(1, 10)
		.window(2, 3)	// skip > count, 数据会存在丢失
		.subscribe(new Consumer<Observable<Integer>>() {

			@Override
			public void accept(Observable<Integer> t) throws Exception {
				
				t.observeOn(Schedulers.newThread())
					.subscribe(new Consumer<Integer>() {
	
						@Override
						public void accept(Integer t) throws Exception {
							System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId());
						}
					});
			}
		});
	
	// 4.3 skip < count: 两个窗口可能会有 count-skip 项数据重叠
	Observable.range(1, 10)
		.window(3, 2)	// skip < count, 数据会重叠
		.subscribe(new Consumer<Observable<Integer>>() {

			@Override
			public void accept(Observable<Integer> t) throws Exception {
				
				t.observeOn(Schedulers.newThread())
					.subscribe(new Consumer<Integer>() {
	
						@Override
						public void accept(Integer t) throws Exception {
							System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId());
						}
					});
			}
		});

输出:

--> accept window(4-1): 1 , ThreadID: 11
--> accept window(4-1): 2 , ThreadID: 11
--> accept window(4-1): 4 , ThreadID: 12
--> accept window(4-1): 3 , ThreadID: 11
--> accept window(4-1): 5 , ThreadID: 12
--> accept window(4-1): 6 , ThreadID: 12
--> accept window(4-1): 7 , ThreadID: 13
--> accept window(4-1): 8 , ThreadID: 13
--> accept window(4-1): 9 , ThreadID: 13
--> accept window(4-1): 10 , ThreadID: 14
--> accept window(4-2): 1 , ThreadID: 15
--> accept window(4-2): 2 , ThreadID: 15
--> accept window(4-2): 4 , ThreadID: 16
--> accept window(4-2): 5 , ThreadID: 16
--> accept window(4-2): 7 , ThreadID: 17
--> accept window(4-2): 8 , ThreadID: 17
--> accept window(4-2): 10 , ThreadID: 18
--> accept window(4-3): 1 , ThreadID: 19
--> accept window(4-3): 2 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 20
--> accept window(4-3): 4 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 21
--> accept window(4-3): 6 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 22
--> accept window(4-3): 8 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 23
--> accept window(4-3): 10 , ThreadID: 23

Javadoc: window(count, skip) (opens new window)

# 1.5 window(timespan, TimeUnit)

这个 window 的方法立即打开它的第一个窗口收集数据。每当过了 timespan 这么长的时间段它就关闭当前窗口并打开一个新窗口(时间单位是 unit ,可选在调度器 scheduler 上执行)收集数据。如果从原始 Observable 收到了 onError 或 onCompleted 通知它也会关闭当前窗口。

这种 window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

实例代码:

	// 5. window(long timespan, TimeUnit unit)
	// 每当过了 timespan 的时间段,它就关闭当前窗口并打开另一个新window收集数据
	Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
		.window(2, TimeUnit.SECONDS)							// 间隔2秒关闭当前 window 并打开一个新 window 收集数据
	//	.window(2, TimeUnit.SECONDS, Schedulers.newThread())	// 指定在 newThread 线程中
		.subscribe(new Consumer<Observable<Long>>() {

			@Override
			public void accept(Observable<Long> t) throws Exception {
				t.observeOn(Schedulers.newThread())
					.subscribe(new Consumer<Long>() {
	
							@Override
							public void accept(Long t) throws Exception {
								System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() );
							}
						});
			}
		});

输出:

--> accept window(5): 1 , ThreadID: 11
--> accept window(5): 2 , ThreadID: 11
--> accept window(5): 3 , ThreadID: 11
--> accept window(5): 4 , ThreadID: 14
--> accept window(5): 5 , ThreadID: 14
--> accept window(5): 6 , ThreadID: 15
--> accept window(5): 7 , ThreadID: 16
--> accept window(5): 8 , ThreadID: 16
--> accept window(5): 9 , ThreadID: 17
--> accept window(5): 10 , ThreadID: 17

Javadoc: window(timespan, TimeUnit) (opens new window)
Javadoc: window(timespan, TimeUnit, scheduler) (opens new window)

# 1.6 window(timespan, TimeUnit, count)

这个 window 的方法立即打开它的第一个窗口。这个变体是 window(count) 和 window(timespan, unit[, scheduler]) 的结合,每当过了 timespan 的时长或者当前窗口收到了 count 项数据,它就关闭当前窗口并打开另一个。如果从原始 Observable收到了 onErroronCompleted 通知它也会关闭当前窗口。

这种window方法发射 一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

img-window(timespan, TimeUnit, count)

实例代码:

	// 6. window(long timespan, TimeUnit unit, long count)
	// 每当过了timespan的时间段或者当前窗口收到了count项数据,它就关闭当前window并打开另一个window收集数据
	Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS)
		.window(2, TimeUnit.SECONDS, 5)	// 每隔2秒关闭当前收集数据的window并开启一个window收集5项数据
	//	.window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 )	// 指定在 newThread 线程中
		.subscribe(new Consumer<Observable<Long>>() {

			@Override
			public void accept(Observable<Long> t) throws Exception {
				t.observeOn(Schedulers.newThread())
					.subscribe(new Consumer<Long>() {

							@Override
							public void accept(Long t) throws Exception {
								System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() );
							}
						});
			}
		});

输出:

--> accept window(6): 1 , ThreadID: 11
--> accept window(6): 2 , ThreadID: 11
--> accept window(6): 3 , ThreadID: 11
--> accept window(6): 4 , ThreadID: 11
--> accept window(6): 5 , ThreadID: 11
--> accept window(6): 6 , ThreadID: 14
--> accept window(6): 7 , ThreadID: 14
--> accept window(6): 8 , ThreadID: 14
--> accept window(6): 9 , ThreadID: 14
--> accept window(6): 10 , ThreadID: 14
--> accept window(6): 11 , ThreadID: 15
--> accept window(6): 12 , ThreadID: 15

Javadoc: window(timespan, TimeUnit, count) (opens new window)
Javadoc: window(timespan, TimeUnit, scheduler, count) (opens new window)

# 1.7 window(timespan, timeskip, TimeUnit)

这个 window 的方法立即打开它的第一个窗口。随后每当过了 timeskip 的时长就打开一个新窗口(时间单位是 unit ,可选在调度器 scheduler 上执行),当窗口打开的时长达 到 timespan ,它就关闭当前打开的窗口。如果从原始Observable收到 了 onError 或 onCompleted 通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的 timeskiptimespan 的值。

img-window(timespan,timeskip, TimeUnit) 解析: 在每一个 timeskip 时期内都创建一个新的 window,然后独立收集 timespan 时间段的原始Observable发射的每一项数据。注意:因为每个 window 都是独立接收数据,当接收数据的时间与创建新 window 的时间不一致时会有数据项重复,丢失等情况。

  • skip = timespan: 会依次顺序接受原始数据,同window(count)
  • skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失
  • skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠

实例代码:

		// 7. window(long timespan, long timeskip, TimeUnit unit)
		// 在每一个timeskip时期内都创建一个新的window,然后独立收集timespan时间段的原始Observable发射的每一项数据,
		// 如果timespan长于timeskip,它发射的数据包将会重叠,因此可能包含重复的数据项。
		// 7.1 skip = timespan: 会依次顺序接受原始数据,同window(count)
		Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
			.window(1, 1, TimeUnit.SECONDS)								// 设置每秒创建一个window,收集2秒的数据
		//	.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())		// 指定在 newThread 线程中
			.subscribe(new Consumer<Observable<Long>>() {

				@Override
				public void accept(Observable<Long> t) throws Exception {
					t.observeOn(Schedulers.newThread())
						.subscribe(new Consumer<Long>() {
	
							@Override
							public void accept(Long t) throws Exception {
								System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId());
							}
						});
				}
			});
		
		// 7.2 skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失
		Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
			.window(1, 2, TimeUnit.SECONDS)								// 设置每秒创建一个window,收集2秒的数据
		//	.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())		// 指定在 newThread 线程中
			.subscribe(new Consumer<Observable<Long>>() {

				@Override
				public void accept(Observable<Long> t) throws Exception {
					t.observeOn(Schedulers.newThread())
						.subscribe(new Consumer<Long>() {
	
							@Override
							public void accept(Long t) throws Exception {
								System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId());
							}
						});
				}
			});
		
		// 7.3 skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠
		Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
			.window(2, 1, TimeUnit.SECONDS)								// 设置每秒创建一个window,收集2秒的数据
		//	.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())		// 指定在 newThread 线程中
			.subscribe(new Consumer<Observable<Long>>() {

				@Override
				public void accept(Observable<Long> t) throws Exception {
					t.observeOn(Schedulers.newThread())
						.subscribe(new Consumer<Long>() {
	
							@Override
							public void accept(Long t) throws Exception {
								System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId());
							}
						});
				}
			});

输出:

--> accept window(7-1): 1 , ThreadID: 11
--> accept window(7-1): 2 , ThreadID: 11
--> accept window(7-1): 3 , ThreadID: 14
--> accept window(7-1): 4 , ThreadID: 15
--> accept window(7-1): 5 , ThreadID: 17
----------------------------------------------------------------------
--> accept window(7-2): 1 , ThreadID: 11
--> accept window(7-2): 3 , ThreadID: 14
--> accept window(7-2): 5 , ThreadID: 15
----------------------------------------------------------------------
--> accept window(7-3): 1 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 17

Javadoc: window(timespan, timeskip, TimeUnit) (opens new window)
Javadoc: window(timespan, timeskip, TimeUnit, scheduler) (opens new window)

# 2. GroupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 的一个子序列。

RxJava实现了 groupBy 操作符。它返回Observable的一个特殊子类 GroupedObservable ,实现了GroupedObservable 接口的对象有一个额外的方法 getKey ,这个 Key 用于将数据分组到指定的Observable。有一个版本的 groupBy 允许你传递一个变换函数,这样它可以在发射结果 GroupedObservable 之前改变数据项。

如果你取消订阅一个 GroupedObservable ,那个 Observable 将会终止。如果之后原始的 Observable又发射了一个与这个Observable的Key匹配的数据, groupBy 将会为这个 Key 创建一个新的 GroupedObservable。

img-GroupBy

注意: groupBy 将原始 Observable 分解为一个发射多个 GroupedObservable 的Observable,一旦有订阅,每个 GroupedObservable 就开始缓存数据。因此,如果你忽略这 些 GroupedObservable 中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略 GroupedObservable 。你应该使用像 take(0) 这样会丢弃自己的缓存的操作符。

# 2.1 groupBy(keySelector)

GroupBy 操作符将原始 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 数据序列的一个子序列。哪个数据项由哪一个 Observable 发射是由一个函数判定的,这个函数给每一项指定一个KeyKey相同的数据会被同一个 Observable 发射。还有一个 delayError 参数的方法,指定是否延迟 Error 通知的Observable。

实例代码:

	// 1. groupBy(keySelector)
	// 将原始数据处理后加上分组tag,通过GroupedObservable发射分组数据
	Observable.range(1, 10)
		.groupBy(new Function<Integer, String>() {

			@Override
			public String apply(Integer t) throws Exception {
				// 不同的key将会产生不同分组的Observable
				return t % 2 == 0 ? "Even" : "Odd"; // 将数据奇偶数进行分组,
			}
		}).observeOn(Schedulers.newThread())
			.subscribe(new Consumer<GroupedObservable<String, Integer>>() {

				@Override
				public void accept(GroupedObservable<String, Integer> grouped) throws Exception {
					// 得到每个分组数据的的Observable
					grouped.subscribe(new Consumer<Integer>() {

						@Override
						public void accept(Integer t) throws Exception {
							// 得到数据
							System.out.println("--> accept groupBy(1):   groupKey: " + grouped.getKey() + ", value: " + t);
						}
					});
				}
			});

输出:

--> accept groupBy(1):   groupKey: Odd, value: 1
--> accept groupBy(1):   groupKey: Odd, value: 3
--> accept groupBy(1):   groupKey: Odd, value: 5
--> accept groupBy(1):   groupKey: Odd, value: 7
--> accept groupBy(1):   groupKey: Odd, value: 9
--> accept groupBy(1):   groupKey: Even, value: 2
--> accept groupBy(1):   groupKey: Even, value: 4
--> accept groupBy(1):   groupKey: Even, value: 6
--> accept groupBy(1):   groupKey: Even, value: 8
--> accept groupBy(1):   groupKey: Even, value: 10

Javadoc: groupBy(keySelector) (opens new window)
Javadoc: groupBy(keySelector, delayError) (opens new window)

# 2.2 groupBy(keySelector, valueSelector)

GroupBy 操作符通过 keySelector 将原始 Observable 按照 Key 分组,产生不同的 Observable,再通过 valueSelector 对原始的数据进行处理,在发送每一个被处理完成的数据。

实例代码:

	// 2. groupBy(Function(T,R),Function(T,R))
	// 第一个func对原数据进行分组处理(仅仅分组添加key,不处理原始数据),第二个func对原始数据进行处理
	Observable.range(1, 10)
		.groupBy(new Function<Integer, String>() {

			@Override
			public String apply(Integer t) throws Exception {
				// 对原始数据进行分组处理
				return t % 2 == 0 ? "even" : "odd";
			}
		},new Function<Integer, String>() {

			@Override
			public String apply(Integer t) throws Exception {
				// 对原始数据进行数据转换处理
				return t + " is " + (t % 2 == 0 ? "even" : "odd");
			}
			}).observeOn(Schedulers.newThread()).subscribe(new Consumer<GroupedObservable<String, String>>() {

				@Override
				public void accept(GroupedObservable<String, String> grouped) throws Exception {
					grouped.subscribe(new Consumer<String>() {

						@Override
						public void accept(String t) throws Exception {
							// 接受最终的分组处理以及原数据处理后的数据
							System.out.println("--> accept groupBy(2):   groupKey = " + grouped.getKey()
									+ ", value = " + t);
						}
					});
				}
			});

输出:

--> accept groupBy(2):   groupKey = odd, value = 1 is odd
--> accept groupBy(2):   groupKey = odd, value = 3 is odd
--> accept groupBy(2):   groupKey = odd, value = 5 is odd
--> accept groupBy(2):   groupKey = odd, value = 7 is odd
--> accept groupBy(2):   groupKey = odd, value = 9 is odd
--> accept groupBy(2):   groupKey = even, value = 2 is even
--> accept groupBy(2):   groupKey = even, value = 4 is even
--> accept groupBy(2):   groupKey = even, value = 6 is even
--> accept groupBy(2):   groupKey = even, value = 8 is even
--> accept groupBy(2):   groupKey = even, value = 10 is even

Javadoc: groupBy(keySelector, valueSelector) (opens new window)
Javadoc: groupBy(keySelector, valueSelector, delayError) (opens new window)
Javadoc: groupBy(keySelector, valueSelector, delayError, bufferSize) (opens new window)

# 3. Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果。

# 3.1 scan(accumulator)

Scan 操作符对原始 Observable 发射的第一项数据应用一个函数,然后将那个函数的结果作为 自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做 accumulator

img-scan(accumulator) 解析: 先发送原始数据第一项数据,然后将这个数据与下一个原始数据作为参数传递给 accumulator, 处理后发送这个数据,并与下一个原始数据一起传递到下一次 accumulator ,直到数据序列结束。类似一个累积的过程

实例代码:

		// 1. scan(BiFunction(Integer sum, Integer t2)) 
		// 接受数据序列,从第二个数据开始,每次会将上次处理数据和现在接受的数据进行处理后发送
		Observable.range(1, 10)
			.scan(new BiFunction<Integer, Integer, Integer>() {
				
				@Override
				public Integer apply(Integer LastItem, Integer item) throws Exception {
					System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item);
					return LastItem + item; // 实现求和操作
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept scan(1): " + t);
					
				}
			});

输出:

--> accept scan(1): 1
--> apply: LastItem = 1, CurrentItem = 2
--> accept scan(1): 3
--> apply: LastItem = 3, CurrentItem = 3
--> accept scan(1): 6
--> apply: LastItem = 6, CurrentItem = 4
--> accept scan(1): 10
--> apply: LastItem = 10, CurrentItem = 5
--> accept scan(1): 15
--> apply: LastItem = 15, CurrentItem = 6
--> accept scan(1): 21
--> apply: LastItem = 21, CurrentItem = 7
--> accept scan(1): 28
--> apply: LastItem = 28, CurrentItem = 8
--> accept scan(1): 36
--> apply: LastItem = 36, CurrentItem = 9
--> accept scan(1): 45
--> apply: LastItem = 45, CurrentItem = 10
--> accept scan(1): 55

Javadoc: scan(accumulator) (opens new window)

# 3.2 scan(initialValue, accumulator)

有一个 scan 操作符的方法,你可以传递一个种子值给累加器函数的第一次调用(Observable 发射的第一项数据)。如果你使用这个版本,scan 将发射种子值作为自己的第一项数据。

注意: 传递 null 作为种子值与不传递是不同的,null 种子值是合法的。

img-scan(initialValue, accumulator)

解析: 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据。

实例代码:

	// 2. scan(R,Func2)
	// 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据
	Observable.range(1, 10)
		.scan(100, new BiFunction<Integer, Integer, Integer>() { 	// 指定初始种子数据为100

			@Override
			public Integer apply(Integer lastValue, Integer item) throws Exception {
				System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item);
				return lastValue + item; 	// 指定初值的求和操作
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept sacn(2) = " + t);
			}
		});

输出:

--> accept sacn(2) = 100
--> apply: lastValue = 100, item = 1
--> accept sacn(2) = 101
--> apply: lastValue = 101, item = 2
--> accept sacn(2) = 103
--> apply: lastValue = 103, item = 3
--> accept sacn(2) = 106
--> apply: lastValue = 106, item = 4
--> accept sacn(2) = 110
--> apply: lastValue = 110, item = 5
--> accept sacn(2) = 115
--> apply: lastValue = 115, item = 6
--> accept sacn(2) = 121
--> apply: lastValue = 121, item = 7
--> accept sacn(2) = 128
--> apply: lastValue = 128, item = 8
--> accept sacn(2) = 136
--> apply: lastValue = 136, item = 9
--> accept sacn(2) = 145
--> apply: lastValue = 145, item = 10
--> accept sacn(2) = 155

注意: 这个操作符默认不在任何特定的调度器上执行。
Javadoc: scan(initialValue, accumulator) (opens new window)

# 4. Cast

Cast 将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是 map 的一个特殊版本。转换失败会有Error通知。

# 4.1 cast(clazz)

将原始数据强制转换为指定的 clazz 类型,如果转换成功发送转换后的数据,否则发送Error通知。一般用于 数据类型的转换数据实际类型的检查(多态)

img-cast(clazz)

实例代码:

	//	cast(clazz) 
	// 1. 基本类型转换
	Observable.range(1, 5)
		.cast(Integer.class)
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("-- accept cast(1): " + t);
			}
		});
			
	// 2. 转换失败通知
	System.out.println("------------------------------------");
	Observable.just((byte)1)
		.cast(Integer.class)
		.subscribe(new Observer<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe(2)");
			}

			@Override
			public void onNext(Integer t) {
				System.out.println("--> onNext(2) = " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(2) = " + e.toString());
			}

			@Override
			public void onComplete() {
				System.out.println("--> onComplete(2)");
			}
		});
	
	System.out.println("------------------------------------");
	class Animal{
		public int id;
	}

	class Dog extends Animal{
		public String name;

		@Override
		public String toString() {
			return "Dog [name=" + name + ", id=" + id + "]";
		}
	}
	
	//  3. 多态转换,检查数据的实际类型
	Animal animal = new Dog();
	animal.id = 666;
	Observable.just(animal)
		.cast(Dog.class)
		.subscribe(new Consumer<Dog>() {

			@Override
			public void accept(Dog t) throws Exception {
				System.out.println("--> accept cast(3): " + t);
			}
		});

输出:

-- accept cast(1): 1
-- accept cast(1): 2
-- accept cast(1): 3
-- accept cast(1): 4
-- accept cast(1): 5
------------------------------------
--> onSubscribe(2)
--> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer
------------------------------------
--> accept cast(3): Dog [name=null, id=666]

Javadoc: cast(clazz) (opens new window)

# 小结:

在实际开发场景中,比如网络数据请求场景,原始的数据格式或类型可能并不满足开发的实际需要,需要对数据进行处理。数据变换操作在实际开发场景中还是非常多的,所以数据的变换是非常重要的。使用Rx的数据变换操作可以轻松完成大多数场景的数据变换操作,提高开发效率。

实例代码: