我一直使用Observable.fromEmitter()
作为Observable.create()
的绝佳替代品。最近我遇到了一些奇怪的行为,但我无法完全弄清原因。我真的很希望有一些了解背压和调度程序的人来看看这个问题。
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
这个应用程序的输出结果是:
Received 1
Received 2
...
Received 128
然后它停在128处(假定这是RxJava默认的缓冲区大小)。
如果我将fromEmitter()
中指定的模式更改为BackpressureMode.NONE
,则代码将按预期工作。如果我删除对observeOn()
的调用,它也会按预期工作。有人能解释一下这是为什么吗?
Observable.create()
在2.0中替换了fromEmitter()
。如果你想使用旧的、可怕的 create 行为,可以使用Observable.unsafeCreate()
。 - Chris Horner