当retryWhen的重试次数用完时,捕获错误。

16

RetryWhen文档中提供的示例如下:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);

如果重试次数用完了,我该如何传播错误?

retryWhen子句之后添加.doOnError(System.out::println)无法捕获错误。它甚至被发出了吗?

在重试之前添加.doOnError(System.out :: println)会在所有重试中显示始终失败

7个回答

14
< p > retryWhen 的文档说明它将onError通知传递给其订阅者并终止。因此,您可以像这样做:

    final int ATTEMPTS = 3;

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> attempts
            .zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
                    i < ATTEMPTS ?
                            Observable.timer(i, SECONDS) :
                            Observable.error(n))
            .flatMap(x -> x))
            .toBlocking()
            .forEach(System.out::println);

7

Javadoc中对retryWhen的说明如下:

如果该Observable调用onComplete或onError,那么重试将在子订阅上调用onCompleted或onError。

简单地说,如果你想传播异常,你需要在重试足够次数后重新抛出原始异常。

一种简单的方法是将你的Observable.range设置为比你想要重试的次数多1。

然后,在你的zip函数中测试当前的重试次数。如果它等于NUMBER_OF_RETRIES + 1,则返回Observable.error(throwable)或重新抛出你的异常。

例如:

Observable.create((Subscriber<? super String> s) -> {
            System.out.println("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
                if (attempt == NUMBER_OF_RETRIES + 1) {
                    throw Throwables.propagate(throwable);
                }
                else {
                    return attempt;
                }
            }).flatMap(i -> {
                System.out.println("delaying retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(System.out::println);

作为旁白,doOnError不会以任何方式影响Observable-它只是提供了一个钩子来执行某些操作,如果发生错误。常见的例子是记录日志。

1

一种选择是使用 Observable.materialize()Observable.range() 的项目转换为通知。然后一旦发出 onCompleted(),就可以向下游传播错误(在下面的示例中,Pair 用于包装 Observable.range() 的通知和来自 Observable 的异常)。

   @Test
   public void retryWhen() throws Exception {

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
           .flatMap(notifAndEx -> {
            System.out.println("delay retry by " + notifAndEx + " second(s)");
            return notifAndEx.getRight().isOnCompleted()
                    ? Observable.<Integer>error(notifAndEx.getLeft())
                    : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
        });
    }).toBlocking().forEach(System.out::println);
}

    private static class Pair<L,R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }

0

您可以使用rxjava-extras中的RetryWhen构建器来获得所需的行为,该构建器已经在Maven Central上发布。请使用最新版本。

Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}) 
.retryWhen(RetryWhen
   .delays(Observable.range(1, 3)
               .map(n -> (long) n), 
            TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println);

嗯,为什么要踩呢?这是一个经过单元测试的库。 - Dave Moten

0

这是我在 Kotlin 中实现它的最简单方法。重试 maxRetries 次,如果没有成功尝试,则将错误传递给 onErrorResumeNext 的源。

api.getItems()
        .retryWhen { errors ->
            errors.zipWith(Observable.range(1, maxRetries + 1), { error, i ->
                if (i <= maxRetries) {
                    Observable.timer(i.toLong(), TimeUnit.SECONDS)
                } else {
                    throw error
                }
            })
        }.onErrorResumeNext { error ->
            return@onErrorResumeNext Observable.error(factory.create(error))
        }

-1

在编程中,你需要在retryWhen之后使用onErrorResumeNext

以你的示例为例

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
            if (i == NUMBER_OF_RETRIES + 1) {
                throw Throwables.propagate(n);
            }
            else {
                return i;
            }
        }).flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        });
    })
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
                                              return Observable.error(t);
                                          })
    .toBlocking().forEach(System.out::println);

在这个类的底部,您可以看到一个实际的例子来理解它是如何工作的。https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java


-1

你可以使用scan函数,它返回一个带有累积索引的对,并决定是否传递错误:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
            .flatMap(pair -> {
                if(pair.first > MAX_RETRY_COUNT) {
                    throw new RuntimeException(pair.second);
                }
                return Observable.timer(pair.first, TimeUnit.SECONDS);
            });

或者你可以继续使用zipWith操作符,但是增加range Observable中的数字并返回一对值,而不仅仅是索引。这样,你就不会丢失关于之前throwable的信息。

attempts
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
    .flatMap(pair -> {
        if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
        System.out.println("delay retry by " + pair.first + " second(s)");
        return Observable.timer(pair.first, TimeUnit.SECONDS);
    });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接