RxJava retryWhen 立即调用

3

我在使用rxjava时遇到了一个非常具体的问题或误解,希望有人能够帮我解决。

我正在运行rxjava 2.1.5,并且有以下代码片段:

public static void main(String[] args) {

    final Observable<Object> observable = Observable.create(emitter -> {
        // Code ... 
    });

    observable.subscribeOn(Schedulers.io())
            .retryWhen(error -> {
                System.out.println("retryWhen");
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"));

}

执行完此操作后,程序会输出:
retryWhen

Process finished with exit code 0

我的问题是:为什么在订阅Observable时立即调用retryWhen?这个Observable什么都不做。

我想要的是,当发射器上出现onError时,才调用retryWhen。我是否误解了rx的工作原理?

谢谢!

添加新代码片段:

public static void main(String[] args) throws InterruptedException {

    final Observable<Object> observable = Observable.create(emitter -> {
        emitter.onNext("next");
        emitter.onComplete();
    });

    final CountDownLatch latch = new CountDownLatch(1);
    observable.subscribeOn(Schedulers.io())
            .doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
            .retryWhen(error -> {
                System.out.println("retryWhen: " + error.toString());
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"),
                         () -> latch.countDown());

    latch.await();
}

调用了Emitter的onNext和complete方法,但是未调用DoOnError方法。输出结果如下:

retryWhen: io.reactivex.subjects.SerializedSubject@35fb3008 subscribeNext

进程以退出代码0结束。

2个回答

4

retryWhenObserver订阅它时调用所提供的函数,因此您将得到一个伴随着发出主序列失败Throwable的序列。您应该将逻辑组合到在这个Function中得到的Observable中,以便最终,一个Throwable将导致另一端的值。

Observable.error(new IOException())
    .retryWhen(e -> {
         System.out.println("Setting up retryWhen");
         int[] count = { 0 };
         return e
            .takeWhile(v -> ++count[0] < 3)
            .doOnNext(v -> { System.out.println("Retrying"); });
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

由于e -> { }函数体为每个订阅者单独执行,因此您可以安全地拥有每个订阅者状态,例如重试计数器。

使用e -> e.retry()没有效果,因为输入错误流永远不会调用其onError


2
是的,我得出了这个结论。有时候似乎写下一个问题能帮助你解决它。谢谢! - Sason Ohanian

0
一个问题是,由于您使用retryWhen()创建了一个线程,但您的应用程序似乎已经完成,因此您将不再收到任何结果。为了看到这种行为,您可能需要添加一个while循环来保持应用程序运行。
实际上,这意味着您需要在代码末尾添加类似以下内容的代码:
while (true) {}

另一个问题是您的示例中没有发出任何错误。您需要发出至少一个值才能调用 onNext(),否则它将不会重复,因为它正在等待该值。
以下是一个工作示例,它发出一个值,然后发出一个错误并重复。您可以使用它:
.retryWhen(errors -> errors)

这与

是一样的。
.retryWhen(errors -> errors.retry())

工作样例:

 public static void main(String[] args) {
        Observable
                .create(e -> {
                    e.onNext("test");
                    e.onError(new Throwable("test"));
                })
                .retryWhen(errors -> errors.retry())
                .subscribeOn(Schedulers.io())
                .subscribe(
                        next -> System.out.println("subscribeNext"),
                        error -> System.out.println("subscribeError"),
                        () -> System.out.println("onCompleted")
                );

        while (true) {

        }
    }

你需要发出一个结果的原因是,Observable 需要发出一个值,否则它会等待直到收到新值。
这是因为 onError 只能在 subscribe 中被调用一次,但 onNext 发出 1..* 值。
你可以通过使用 doOnError() 来检查这种行为,它会在重试 Observable 时提供错误信息。
Observable
            .create(e -> e.onError(new Exception("empty")))
            .doOnError(e -> System.out.println("error received " + e))
            .retryWhen(errors -> errors.retry())
            .subscribeOn(Schedulers.io())
            .subscribe(
                    nextOrSuccess -> System.out.println("nextOrSuccess " + nextOrSuccess),
                    error -> System.out.println("subscribeError")
            );

hold on, starting netbeans - Emanuel
{btsdaf} - Sason Ohanian
{btsdaf} - Emanuel
{btsdaf} - Sason Ohanian
{btsdaf} - Sason Ohanian
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接