6. Observable的数据过滤(二)

2019/12/30 Rxjava2

# 6. Filter

只发射通过了函数过滤的数据项。

img-filter

实例代码:

	// filter(Predicate<? super Integer> predicate)
	// 验证数据,决定是否发射数据
	Observable.range(1, 10)
			.filter(new Predicate<Integer>() {

				@Override
				public boolean test(Integer t) throws Exception {
					// 进行测试验证是否需要发射数据
					return t > 5 ? true : false;
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept filter: " + t);
				}
			});

输出:

--> accept filter: 6
--> accept filter: 7
--> accept filter: 8
--> accept filter: 9
--> accept filter: 10

Javadoc: filter(predicate) (opens new window)

# 7. Frist

只发射第一项或者满足某个条件的第一项数据。如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用 First 操作符。

Frist 操作符有以下几种操作:

# 7.1 firstElement()

只发射第一个数据,当数据存在的情况。

img-firstElement()

实例代码:

	// 1. firstElement()
	// 只发射第一个数据
	Observable.range(1, 10)
		.firstElement()
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept firstElement(1): "  + t);
			}
		});

输出:

--> accept firstElement(1): 1

Javadoc: firstElement() (opens new window)

# 7.2 first(defaultItem)

first(defaultItem)firstElement() 类似,但是在Observagle没有发射任何数据时发射一个你在参数中指定的 defaultItem 默认值。

img-first(defaultItem)

实例代码:

	// 2. first(Integer defaultItem)
	// 发射第一个数据项,如果没有数据项,发送默认的defaultItem
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).first(999) // 没有数据发送时,发送默认值999
	  .subscribe(new Consumer<Integer>() {

		  @Override
		  public void accept(Integer t) throws Exception {
			  System.out.println("--> accept first(2): " + t);
		  }
	  });

输出:

--> accept first(2): 999

Javadoc: first(defaultItem) (opens new window)

# 7.3 firstOrError()

发射第一个数据项,如果没有数据项,会发送 NoSuchElementException 通知。

img-firstOrError

实例代码:

	// 3. first(Integer defaultItem)
	// 发射第一个数据项,如果没有数据项,会有Error: NoSuchElementException
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).firstOrError() // 没有数据发送时,将会发送NoSuchElementException通知
	  .subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe: ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> accept onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> acctpt onError(3): " + e);
			}
	  });

输出:

--> onSubscribe: 
--> acctpt onError(3): java.util.NoSuchElementException

Javadoc: firstOrError() (opens new window)

# 8. Single

singlefirst 类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException 的通知。

Single 有以下几种操作:

# 8.1 singleElement()

发射单例数据,超过一个就会发送 NoSuchElementException 通知。

img-singleElement

实例代码:

	// 1.singleElement()
	// 发射单例数据,超过一个就会NoSuchElementException	
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			emitter.onNext(2);
			emitter.onComplete();
		}
	}).singleElement() // 发送单个数据,大于1项数据就会有Error通知
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept singleElement(1): " + t);
			}
	  },new Consumer<Throwable>() {

		@Override
		public void accept(Throwable t) throws Exception {
			System.out.println("--> OnError(1): " + t);
		}
	});

输出:

--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!

Javadoc: singleElement() (opens new window)

# 8.2 single(defaultItem)

发射单例数据,没有接收到数据项则发送指定默认 defaultItem 数据。

img-single(defaultItem)

实例代码:

	// 2. single(Integer defaultItem)
	// 发射单例数据,没有数据项发送指定默认defaultItem
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).single(999) // 没有接受到数据则发送默认数据999
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept single(2): " + t);
			}
	  });

输出:

--> accept single(2): 999

Javadoc: single(defaultItem) (opens new window)

# 8.3 singleOrError()

发射一个单例的数据,如果数据源没有数据项,则发射一个 NoSuchElementException 通知。

img-singleOrError

实例代码:

	// 3.singleOrError()
	// 发射一个单例的数据,如果数据源 没有数据项,则发送一个NoSuchElementException异常通知
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).singleOrError() // 如果没有数据项发送,则发送一个NoSuchElementException异常通知
	  .subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe(3): ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

输出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: singleOrError() (opens new window)

# 9. ElementAt

ElementAt 操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

ElementAt 操作符有以下几种操作:

# 9.1 elementAt(index)

发射索引位置第 index 项数据(从0开始计数),如果数据不存在,会 IndexOutOfBoundsException 异常。

img-elementAt(index)

实例代码:

	// 1. elementAt(long index)
	// 指定发射第N项数据(从0开始计数),如果数据不存在,会IndexOutOfBoundsException异常
	Observable.range(1, 10)
		.elementAt(5) // 发射数据序列中索引为5的数据项,索引从0开始
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept ElementAt(1): " + t);
			}
		});

输出:

--> accept ElementAt(1): 6

Javadoc: elementAt(index) (opens new window)

# 9.2 elementAt(index, defaultItem)

发射索引位置第 index 项数据(从0开始计数),如果数据不存在,发送默认 defaultItem 数据。

img-elementAt(index, defaultItem)

实例代码:

	// 2. elementAt(long index, Integer defaultItem)
	// 指定发射第N项数据(从0开始计数),如果数据不存在,发送默认defaultItem
	Observable.range(1, 10)
		.elementAt(20, 0) // 发射索引第20项数据,不存在此项数据时,发送默认数据0
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept elementAt(2): " + t);
			}
		});

输出:

--> accept elementAt(2): 0

Javadoc: elementAt(index, defaultItem) (opens new window)

# 9.3 elementAtOrError(index)

发射索引位置第 index 项数据(从0开始计数),如果指定发射的数据不存在,会发射NoSuchElementException 异常通知。

img-elementAtOrError(index)

实例代码:

	// 3. elementAtOrError(long index)
	// 如果指定发射的数据不存在,会抛出NoSuchElementException
	Observable.range(1, 10)
		.elementAtOrError(50) // 发射索引为50的数据,不存在则发送NoSuchElementException异常通知
		.subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe(3): ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

输出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: elementAtOrError(index) (opens new window)

# 10. ignoreElements

不发射任何数据,只发射Observable的终止通知。

IgnoreElements 操作符抑制原始Observable发射的所有数据,只允许它的终止通知 (onError 或 onCompleted )通过。

img-ignoreElements

解析: 如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用 ignoreElements 操作符,它会确保永远不会调用观察者的 onNext() 方法。

实例代码:

	// ignoreElements()
	// 只接受onError或onCompleted通知,拦截onNext事件(不关心发射的数据,只希望在成功或者失败的时候收到通知)
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			//	int i = 1/0;
			emitter.onComplete();
		}
	}).ignoreElements()
	  .subscribe(new CompletableObserver() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError: " + e);
			}

			@Override
			public void onComplete() {
				System.out.println("--> onComplete");
			}
	  });

输出:

--> onSubscribe
--> onComplete

Javadoc: ignoreElements() (opens new window)

# 11. Last

只发射最后一项(或者满足某个条件的最后一项)数据。

如果你只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用 Last 操作符。

Last 有以下几种操作:

# 11.1 lastElement()

只发射最后一项数据,使用没有参数的 last 操作符,如果Observable中没有数据发送,则同样没有数据发送。

img-lastElement

实例代码:

	// 1. lastElement()
	// 接受最后一项数据
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			emitter.onNext(2);
			emitter.onNext(3);
			emitter.onComplete();
		}
	}).lastElement() // 存在数据发送的话,即发射最后一项数据,否则没有数据发射
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept lastElement(1): " + t);
			}
		});

输出:

--> accept lastElement(1): 3

Javadoc: lastElement() (opens new window)

# 11.2 last(defaultItem)

只发射最后一项数据,如果Observable中没有数据发送,则发送指定的默认值 defaultItem

img-last(defaultItem)

实例代码:

	// 2. last(Integer defaultItem)
	// 接受最后一项数据,如果没有数据发送,发送默认数据:defaultItem
	Observable.range(0, 0)
		.last(999) // 接受最后一项数据,没有数据则发送默认数据999
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept: last(2): " + t);
			}
		});

输出:

--> accept: last(2): 999

Javadoc: last(defaultItem) (opens new window)

# 11.3 lastOrError()

接受最后一项数据,如果没有数据发送,抛出 NoSuchElementException 异常通知。

img-lastOrError

实例代码:

	// 3. lastOrError()
	// 接受最后一项数据,如果没有数据发送,抛出onError: NoSuchElementException
	Observable.range(0, 0)
		.lastOrError() // 接受最后一项数据,如果没有数据,则反射NoSuchElementException异常通知
		.subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe: ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3)");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

输出:

--> onSubscribe: 
--> onError(3): java.util.NoSuchElementException

Javadoc: lastOrError() (opens new window)

# 12. Take

使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

Take 操作符有以下几种操作:

# 12.1 take(count)

如果你对一个Observable使用 take(n) 操作符,而那个Observable发射的数据少于N项,那么 take 操作生成的Observable不会抛异常或发射 onError 通知,在完成前它只会发射相同的少量数据。

img-take(count)

实例代码:

	// 1. take(long count)
	// 返回前count项数据
	Observable.range(1, 100)
		.take(5) // 返回前5项数据
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept take(1): " + t);
			}
		});

输出:

--> accept take(1): 1
--> accept take(1): 2
--> accept take(1): 3
--> accept take(1): 4
--> accept take(1): 5

Javadoc: take(count) (opens new window)

# 12.2 take(timeout, TimeUnit)

取一定时间间隔内的数据,有可选参数 scheduler 指定线程调度器。

img-Take(timeout, TimeUnit)

实例代码:

	// 2. take(long time, TimeUnit unit,[Scheduler] scheduler)
	//  取一定时间间隔内的数据,可选参数scheduler指定线程调度器
	Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS)
		.take(5, TimeUnit.SECONDS) // 返回前5秒的数据项
		.subscribe(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> accept take(2): " + t);
			}
		});

输出:

--> accept take(2): 1
--> accept take(2): 2
--> accept take(2): 3
--> accept take(2): 4
--> accept take(2): 5

Javadoc: take(timeout, TimeUnit) (opens new window)
Javadoc: take(timeout, TimeUnit, Scheduler) (opens new window)

# 13. TakeLast

使用 TakeLast 操作符修改原始Observable,你可以只发射Observable发射的后N项数据,忽略前面的数据。

takeLast 的这个变体默认在 computation 调度器上执行,但是你可以使用第三个参数指定其它的调度器。

TakeLast 一般有下面几种操作:

# 13.1 takeLast(count)

使用 takeLast(count) 操作符,你可以只发射原始Observable发射的后 count 项数据(或者原始Observable发射onCompleted() 前的 count 项数据),忽略之前的数据。 注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

img-takeLast(count)

实例代码:

    // 1. takeLast(int count)
    // 接受Observable数据发射完成前的Count项数据, 忽略前面的数据
    Observable.range(1, 10)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(1): ");
                }
            })
            .takeLast(5) // 发送数据发射完成前的5项数据
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept takeLast(1): " + t);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
--> onCompleted(1): 
--> accept takeLast(1): 6
--> accept takeLast(1): 7
--> accept takeLast(1): 8
--> accept takeLast(1): 9
--> accept takeLast(1): 10

Javadoc: takeLast(count) (opens new window)

# 13.2 takeLast(time, TimeUnit)

还有一个 takeLast 变体接受一个时长而不是数量参数。它会发射在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意: 这会延迟原始Observable发射的任何数据项,直到它全部完成。

img-takeLast(time, TimeUnit)

实例代码:

    // 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可选参数 scheduler:指定工作调度器  delayError:延迟Error通知  bufferSize:指定缓存大小
    // 接受Observable数据发射完成前指定时间间隔发射的数据项
    Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(2): ");
                }
            })
            .takeLast(3, TimeUnit.SECONDS) // 发送数据发射完成前3秒时间段内的数据
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(2): " + t);
                }
            });                                                     

输出:

--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
--> onCompleted(2): 
--> accept takeLast(2): 3
--> accept takeLast(2): 4
--> accept takeLast(2): 5

Javadoc: takeLast(long time, TimeUnit unit) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) (opens new window)

# 13.3 takeLast(count, time, TimeUnit)

接受 Observable 发射完成前 time 时间段内收集 count 项数据并发射。

img-takeLast(count, time, TimeUnit)

示例代码:

    // 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可选参数 scheduler:指定工作调度器  delayError:延迟Error通知  bufferSize:指定缓存大小
    // 接受Observable数据发射完成前time时间段内收集count项数据并发射
    Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(3): ");
                }
            })
            .takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原数据发射完成前500毫秒内接受2项数据
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(3): " + t);
                }
            });

输出:

--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
--> onCompleted(3): 
--> accept takeLast(3): 9
--> accept takeLast(3): 10

Javadoc: takeLast(long count, long time, TimeUnit unit) (opens new window)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler) (opens new window)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) (opens new window)

# 14. OfType

ofType 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

img-OfType

示例代码:

    Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)};
    // ofType(Class clazz)
    // 过滤数据,只返回特定类型的数据
    Observable.fromArray(dataObjects)
            .ofType(Integer.class) // 过滤Integer类型的数据
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept ofType: " + t);
                }
            });

输出:

--> accept ofType: 1
--> accept ofType: 5

Javadoc: ofType(Class clazz) (opens new window)

# 小结:

数据过滤的操作符主要是过滤被观察者(Observable)发射的数据序列,按照指定的规则过滤数据项,忽略并丢弃其他的数据。实际开发场景如网络数据的过滤,数据库数据的过滤等,是开发中重要且常见的操作之一。

实例代码: