RxJava中的doOnError和onErrorReturn是如何工作的?

29

我做了这些单元测试,结果完全不是我预期的:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.doOnError(throwable -> System.out.println("doOnError"));
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.onErrorReturn(throwable -> "Yeah I got this");
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}

因此,两个测试都输出了"subscribe.onError",似乎既没有调用doOnError也没有调用onErrorReturn

doOnError的文档说明为:

修改源Observable,以便在它调用onError时调用一个操作。

我不确定如何解释这一点,但我期望要么输出"doOnError",要么输出"doOnError"后跟"subscribe.onError"。

onErrorReturn的文档说明为:

指示Observable在遇到错误时发出一个项目(由指定函数返回),而不是调用onError。

因此,我期望从后一个测试中得到"got: Yeah I got this"的输出。

为什么会这样呢?

1个回答

23

无论是 doOnError 还是 onErrorReturn 都会返回一个具有更改行为的新 Observable。我同意它们的文档可能会有点误导。按照以下方式修改您的测试以获得期望的行为:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .doOnError(throwable -> System.out.println("doOnError"));

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .onErrorReturn(throwable -> "Yeah I got this");

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}

3
谢谢。现在它按预期工作了,我也学到了doOnError不会改变订阅者的行为 - 它只是添加了第二个错误侦听器。 - Nilzor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接