# 6. Filter
只发射通过了函数过滤的数据项。
实例代码:
// filter(Predicate<? super Integer> predicate)
// 验证数据,决定是否发射数据
Observable.range(1, 10)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer t) throws Exception {
// 进行测试验证是否需要发射数据
return t > 5 ? true : false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept filter: " + t);
}
});
输出:
--> accept filter: 6
--> accept filter: 7
--> accept filter: 8
--> accept filter: 9
--> accept filter: 10
Javadoc: filter(predicate) (opens new window)
# 7. Frist
只发射第一项或者满足某个条件的第一项数据。如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用 First
操作符。
Frist
操作符有以下几种操作:
# 7.1 firstElement()
只发射第一个数据,当数据存在的情况。
实例代码:
// 1. firstElement()
// 只发射第一个数据
Observable.range(1, 10)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept firstElement(1): " + t);
}
});
输出:
--> accept firstElement(1): 1
Javadoc: firstElement() (opens new window)
# 7.2 first(defaultItem)
first(defaultItem)
与 firstElement()
类似,但是在Observagle没有发射任何数据时发射一个你在参数中指定的 defaultItem
默认值。
实例代码:
// 2. first(Integer defaultItem)
// 发射第一个数据项,如果没有数据项,发送默认的defaultItem
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).first(999) // 没有数据发送时,发送默认值999
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept first(2): " + t);
}
});
输出:
--> accept first(2): 999
# 7.3 firstOrError()
发射第一个数据项,如果没有数据项,会发送 NoSuchElementException
通知。
实例代码:
// 3. first(Integer defaultItem)
// 发射第一个数据项,如果没有数据项,会有Error: NoSuchElementException
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).firstOrError() // 没有数据发送时,将会发送NoSuchElementException通知
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe: ");
}
@Override
public void onSuccess(Integer t) {
System.out.println("--> accept onSuccess(3): " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> acctpt onError(3): " + e);
}
});
输出:
--> onSubscribe:
--> acctpt onError(3): java.util.NoSuchElementException
Javadoc: firstOrError() (opens new window)
# 8. Single
single
与 first
类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException
的通知。
Single 有以下几种操作:
# 8.1 singleElement()
发射单例数据,超过一个就会发送 NoSuchElementException
通知。
实例代码:
// 1.singleElement()
// 发射单例数据,超过一个就会NoSuchElementException
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).singleElement() // 发送单个数据,大于1项数据就会有Error通知
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept singleElement(1): " + t);
}
},new Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
System.out.println("--> OnError(1): " + t);
}
});
输出:
--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!
Javadoc: singleElement() (opens new window)
# 8.2 single(defaultItem)
发射单例数据,没有接收到数据项则发送指定默认 defaultItem
数据。
实例代码:
// 2. single(Integer defaultItem)
// 发射单例数据,没有数据项发送指定默认defaultItem
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).single(999) // 没有接受到数据则发送默认数据999
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept single(2): " + t);
}
});
输出:
--> accept single(2): 999
# 8.3 singleOrError()
发射一个单例的数据,如果数据源没有数据项,则发射一个 NoSuchElementException
通知。
实例代码:
// 3.singleOrError()
// 发射一个单例的数据,如果数据源 没有数据项,则发送一个NoSuchElementException异常通知
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).singleOrError() // 如果没有数据项发送,则发送一个NoSuchElementException异常通知
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3): ");
}
@Override
public void onSuccess(Integer t) {
System.out.println("--> onSuccess(3): " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
});
输出:
--> onSubscribe(3):
--> onError(3): java.util.NoSuchElementException
Javadoc: singleOrError() (opens new window)
# 9. ElementAt
ElementAt
操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
ElementAt 操作符有以下几种操作:
# 9.1 elementAt(index)
发射索引位置第 index
项数据(从0开始计数),如果数据不存在,会 IndexOutOfBoundsException
异常。
实例代码:
// 1. elementAt(long index)
// 指定发射第N项数据(从0开始计数),如果数据不存在,会IndexOutOfBoundsException异常
Observable.range(1, 10)
.elementAt(5) // 发射数据序列中索引为5的数据项,索引从0开始
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept ElementAt(1): " + t);
}
});
输出:
--> accept ElementAt(1): 6
Javadoc: elementAt(index) (opens new window)
# 9.2 elementAt(index, defaultItem)
发射索引位置第 index
项数据(从0开始计数),如果数据不存在,发送默认 defaultItem
数据。
实例代码:
// 2. elementAt(long index, Integer defaultItem)
// 指定发射第N项数据(从0开始计数),如果数据不存在,发送默认defaultItem
Observable.range(1, 10)
.elementAt(20, 0) // 发射索引第20项数据,不存在此项数据时,发送默认数据0
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept elementAt(2): " + t);
}
});
输出:
--> accept elementAt(2): 0
# 9.3 elementAtOrError(index)
发射索引位置第 index
项数据(从0开始计数),如果指定发射的数据不存在,会发射NoSuchElementException
异常通知。
实例代码:
// 3. elementAtOrError(long index)
// 如果指定发射的数据不存在,会抛出NoSuchElementException
Observable.range(1, 10)
.elementAtOrError(50) // 发射索引为50的数据,不存在则发送NoSuchElementException异常通知
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3): ");
}
@Override
public void onSuccess(Integer t) {
System.out.println("--> onSuccess(3): " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
});
输出:
--> onSubscribe(3):
--> onError(3): java.util.NoSuchElementException
# 10. ignoreElements
不发射任何数据,只发射Observable的终止通知。
IgnoreElements
操作符抑制原始Observable发射的所有数据,只允许它的终止通知 (onError 或 onCompleted )通过。
解析: 如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用 ignoreElements 操作符,它会确保永远不会调用观察者的 onNext() 方法。
实例代码:
// ignoreElements()
// 只接受onError或onCompleted通知,拦截onNext事件(不关心发射的数据,只希望在成功或者失败的时候收到通知)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
// int i = 1/0;
emitter.onComplete();
}
}).ignoreElements()
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
输出:
--> onSubscribe
--> onComplete
Javadoc: ignoreElements() (opens new window)
# 11. Last
只发射最后一项(或者满足某个条件的最后一项)数据。
如果你只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用 Last
操作符。
Last
有以下几种操作:
# 11.1 lastElement()
只发射最后一项数据,使用没有参数的 last
操作符,如果Observable中没有数据发送,则同样没有数据发送。
实例代码:
// 1. lastElement()
// 接受最后一项数据
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).lastElement() // 存在数据发送的话,即发射最后一项数据,否则没有数据发射
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept lastElement(1): " + t);
}
});
输出:
--> accept lastElement(1): 3
Javadoc: lastElement() (opens new window)
# 11.2 last(defaultItem)
只发射最后一项数据,如果Observable中没有数据发送,则发送指定的默认值 defaultItem
。
实例代码:
// 2. last(Integer defaultItem)
// 接受最后一项数据,如果没有数据发送,发送默认数据:defaultItem
Observable.range(0, 0)
.last(999) // 接受最后一项数据,没有数据则发送默认数据999
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept: last(2): " + t);
}
});
输出:
--> accept: last(2): 999
Javadoc: last(defaultItem) (opens new window)
# 11.3 lastOrError()
接受最后一项数据,如果没有数据发送,抛出 NoSuchElementException
异常通知。
实例代码:
// 3. lastOrError()
// 接受最后一项数据,如果没有数据发送,抛出onError: NoSuchElementException
Observable.range(0, 0)
.lastOrError() // 接受最后一项数据,如果没有数据,则反射NoSuchElementException异常通知
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe: ");
}
@Override
public void onSuccess(Integer t) {
System.out.println("--> onSuccess(3)");
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
});
输出:
--> onSubscribe:
--> onError(3): java.util.NoSuchElementException
Javadoc: lastOrError() (opens new window)
# 12. Take
使用Take
操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。
Take 操作符有以下几种操作:
# 12.1 take(count)
如果你对一个Observable使用 take(n)
操作符,而那个Observable发射的数据少于N项,那么 take
操作生成的Observable不会抛异常或发射 onError
通知,在完成前它只会发射相同的少量数据。
实例代码:
// 1. take(long count)
// 返回前count项数据
Observable.range(1, 100)
.take(5) // 返回前5项数据
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept take(1): " + t);
}
});
输出:
--> accept take(1): 1
--> accept take(1): 2
--> accept take(1): 3
--> accept take(1): 4
--> accept take(1): 5
Javadoc: take(count) (opens new window)
# 12.2 take(timeout, TimeUnit)
取一定时间间隔内的数据,有可选参数 scheduler
指定线程调度器。
实例代码:
// 2. take(long time, TimeUnit unit,[Scheduler] scheduler)
// 取一定时间间隔内的数据,可选参数scheduler指定线程调度器
Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS)
.take(5, TimeUnit.SECONDS) // 返回前5秒的数据项
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept take(2): " + t);
}
});
输出:
--> accept take(2): 1
--> accept take(2): 2
--> accept take(2): 3
--> accept take(2): 4
--> accept take(2): 5
Javadoc: take(timeout, TimeUnit) (opens new window)
Javadoc: take(timeout, TimeUnit, Scheduler) (opens new window)
# 13. TakeLast
使用 TakeLast
操作符修改原始Observable,你可以只发射Observable发射的后N项数据,忽略前面的数据。
takeLast
的这个变体默认在 computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
TakeLast 一般有下面几种操作:
# 13.1 takeLast(count)
使用 takeLast(count)
操作符,你可以只发射原始Observable发射的后 count
项数据(或者原始Observable发射onCompleted()
前的 count
项数据),忽略之前的数据。 注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。
实例代码:
// 1. takeLast(int count)
// 接受Observable数据发射完成前的Count项数据, 忽略前面的数据
Observable.range(1, 10)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept(1): " + t);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("--> onCompleted(1): ");
}
})
.takeLast(5) // 发送数据发射完成前的5项数据
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept takeLast(1): " + t);
}
});
输出:
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
--> onCompleted(1):
--> accept takeLast(1): 6
--> accept takeLast(1): 7
--> accept takeLast(1): 8
--> accept takeLast(1): 9
--> accept takeLast(1): 10
Javadoc: takeLast(count) (opens new window)
# 13.2 takeLast(time, TimeUnit)
还有一个 takeLast
变体接受一个时长而不是数量参数。它会发射在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。
注意: 这会延迟原始Observable发射的任何数据项,直到它全部完成。
实例代码:
// 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
// 可选参数 scheduler:指定工作调度器 delayError:延迟Error通知 bufferSize:指定缓存大小
// 接受Observable数据发射完成前指定时间间隔发射的数据项
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(2): " + t);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("--> onCompleted(2): ");
}
})
.takeLast(3, TimeUnit.SECONDS) // 发送数据发射完成前3秒时间段内的数据
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept takeLast(2): " + t);
}
});
输出:
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
--> onCompleted(2):
--> accept takeLast(2): 3
--> accept takeLast(2): 4
--> accept takeLast(2): 5
Javadoc: takeLast(long time, TimeUnit unit) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError) (opens new window)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) (opens new window)
# 13.3 takeLast(count, time, TimeUnit)
接受 Observable 发射完成前 time
时间段内收集 count
项数据并发射。
示例代码:
// 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
// 可选参数 scheduler:指定工作调度器 delayError:延迟Error通知 bufferSize:指定缓存大小
// 接受Observable数据发射完成前time时间段内收集count项数据并发射
Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(3): " + t);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("--> onCompleted(3): ");
}
})
.takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原数据发射完成前500毫秒内接受2项数据
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept takeLast(3): " + t);
}
});
输出:
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
--> onCompleted(3):
--> accept takeLast(3): 9
--> accept takeLast(3): 10
Javadoc: takeLast(long count, long time, TimeUnit unit) (opens new window)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler) (opens new window)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) (opens new window)
# 14. OfType
ofType
是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
示例代码:
Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)};
// ofType(Class clazz)
// 过滤数据,只返回特定类型的数据
Observable.fromArray(dataObjects)
.ofType(Integer.class) // 过滤Integer类型的数据
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept ofType: " + t);
}
});
输出:
--> accept ofType: 1
--> accept ofType: 5
# 小结:
数据过滤的操作符主要是过滤被观察者(Observable)发射的数据序列,按照指定的规则过滤数据项,忽略并丢弃其他的数据。实际开发场景如网络数据的过滤,数据库数据的过滤等,是开发中重要且常见的操作之一。
实例代码: