RxJava Observable.fromEmitter的奇怪背压行为

4

我一直使用Observable.fromEmitter()作为Observable.create()的绝佳替代品。最近我遇到了一些奇怪的行为,但我无法完全弄清原因。我真的很希望有一些了解背压和调度程序的人来看看这个问题。

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

这个应用程序的输出结果是:
Received 1
Received 2
...
Received 128

然后它停在128处(假定这是RxJava默认的缓冲区大小)。

如果我将fromEmitter()中指定的模式更改为BackpressureMode.NONE,则代码将按预期工作。如果我删除对observeOn()的调用,它也会按预期工作。有人能解释一下这是为什么吗?


这很奇怪,它不应该停止。即使在使用 toBlocking() 或在 observeOn 中使用较小的缓冲区大小时,它也会停止。我将进一步调查此问题。 - akarnokd
在 RX Java 2.0 中,Observable.fromEmitter 的等效方法是什么? - Mike6679
1
Observable.create() 在2.0中替换了 fromEmitter()。如果你想使用旧的、可怕的 create 行为,可以使用 Observable.unsafeCreate() - Chris Horner
2个回答

4
这是一个相同线程池死锁的情况。 subscribeOn 将下游的 request 调度到与它使用的相同线程上,但如果该线程正在忙于睡眠/发射循环,则请求永远无法传递到 fromEmitter,因此在一段时间后,LATEST 开始丢弃元素,直到最后一个值(999)被传递,如果主源等待足够长的时间。 (这与我们删除的 onBackpressureBlock 类似的情况。)
如果 subscribeOn 没有执行请求的这种调度,该示例将正常工作。
我已经打开了 一个问题 来解决解决方案。
现在的解决方法是使用具有更大缓冲区大小的 observeOn(有一个重载),或者使用 fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

谢谢你简明扼要的回答,David!非常感谢。 - Chris Horner

2

这不是奇怪的,而是预期的。

让我们追踪调用。从以下开始:

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

等等。查看以下构造函数:

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize)

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

如果您没有指定缓冲区,那么默认值为RxRingBuffer.SIZE,其大小取决于平台。
因此,当您在没有缓冲区大小的情况下调用observeOn操作符时,默认值为128(Android上为16)。
解决这个问题非常简单:只需使用另一个observeOn操作符并声明缓冲区大小即可。但是,如果您为1000(与发射器中元素数量相同)声明缓冲区大小,程序仍将在未发出所有值(约170个)的情况下结束。为什么?因为程序结束了。主线程在10,000秒后结束,并且您的计算在另一个线程(Schedulers.computation())中完成。解决方法是使用CountdownLatch。请注意,永远不要在生产中使用它,它只对测试有帮助。

谢谢你的回答!我最终更喜欢David的答案,但我很感激你花费的努力去分析这些调用。 - Chris Horner

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接