RxJava:我可以使用retry()函数并加上延迟吗?

99

我正在使用RxJava在我的Android应用程序中异步处理网络请求。现在,我想在一定时间后仅重试失败的网络请求。

有没有办法在Observable上使用retry(),但只在一定延迟后重试?

是否有一种方法让Observable知道它目前正在重试(与第一次尝试相对)?

我查看了debounce()/throttleWithTimeout(),但它们似乎在做不同的事情。

编辑:

我认为我找到了一种方法来实现这个功能,但我希望确认这是否是正确的方法或者是否还有其他更好的方法。

我所做的是:在我的Observable.OnSubscribe的call()方法中,在调用Subscribers的onError()方法之前,我简单地让线程睡眠所需的时间。因此,要每隔1000毫秒重试一次,我会执行类似以下的操作:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

由于此方法已在IO线程上运行,因此它不会阻塞UI。我唯一能看到的问题是即使第一个错误会有延迟报告,因此即使没有retry(),延迟仍然存在。我希望如果延迟不是在错误之后应用,而是在重试之前应用(但显然不是在第一次尝试之前)。

17个回答

184

您可以使用retryWhen()运算符向任何Observable添加重试逻辑。

以下类包含重试逻辑:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

使用方法:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

2
Error:(73, 20) error: incompatible types: RetryWithDelay cannot be converted to Func1<? super Observable<? extends Throwable>,? extends Observable<?>> - Nima GT
3
@nima 我曾经遇到过同样的问题,将 RetryWithDelay 更改为以下内容:http://pastebin.com/6SiZeKnC - user1480019
2
看起来 RxJava 的 retryWhen 操作符自我最初编写以来已经发生了变化。我会更新答案。 - kjones
3
请更新此答案以符合 RxJava 2 的要求。 - Vishnu M.
4
这里可以找到与Observable 无关的实现(支持所有RxJava 2.x兼容的可观察对象,基于Publisher接口)。链接在这里:https://pastebin.com/Jj1kWHKa。 - Eido95
显示剩余6条评论

24

19

这个示例适用于jxjava 2.2.2版本:

无延迟重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

延迟后重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

如果 someConnection.send() 失败,我们的源单就会失败。 当这种情况发生时,retryWhen 内部的错误可观察对象会发出错误信号。 我们将该信号延迟 300ms 并将其发送回以表示重试。 take(5) 保证在收到五个错误后,我们的信号可观察对象会终止。 retryWhen 看到终止后,在第五次失败后不会再进行重试。


2
这种方法似乎很直接,但当重试次数用尽而不是抛出错误时,它会忽略错误。对于某些情况来说可能很方便。 - racs

9

这是基于Ben Christensen的代码片段的解决方案,我看到了RetryWhen ExampleRetryWhenTestsConditional(我不得不将n.getThrowable()更改为n才能使其工作)。我使用evant/gradle-retrolambda使lambda符号在Android上工作,但您不必使用lambda(尽管强烈建议使用)。对于延迟,我实现了指数退避,但您可以插入任何想要的退避逻辑。为了完整起见,我添加了subscribeOnobserveOn操作符。我正在使用ReactiveX/RxAndroid来获取AndroidSchedulers.mainThread()

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

2
这看起来很优雅,但我不使用lambda函数,我该如何在没有lambda的情况下编写代码?@amitai-hoze - ericn
还有,我该如何编写代码以便将此重试函数用于其他“Observable”对象? - ericn
没关系,我已经使用了 kjones 的解决方案,对我来说非常完美,谢谢。 - ericn

8

我使用一个包装函数retryObservable(MyRequestObservable, retrycount, seconds)代替MyRequestObservable.retry,该函数返回一个新的Observable,处理延迟的间接方式,所以我可以执行以下操作:

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}

非常抱歉之前没有及时回复 - 不知何故,我错过了SO的通知,告知有人回答了我的问题... 我给你的回答点了赞,因为我喜欢这个想法,但是根据SO的原则,我不确定是否应该接受这个回答,因为它更像是一个绕过而不是直接回答。但是我猜,既然你提供了一个绕过的方法,对于我最初的问题来说答案就是“不行”... - david.mihola

8

基于kjones的回答,这是RxJava 2.x带延迟重试的Kotlin版本扩展。将Observable替换为Flowable,以创建相同的扩展。

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

然后将其应用于observable observable.retryWithDelay(3, 1000)


这个能否也用 Single 替换呢? - Papps
4
没问题,注意flatMap需要使用Flowable.timerFlowable.error,即使函数是Single<T>.retryWithDelay - JuliusScript

7
retryWhen是一个复杂的,甚至可能有缺陷的操作符。官方文档和至少一个回答在这里使用range操作符,如果没有重试,则会失败。请参见我与ReactiveX成员David Karnok的讨论
我通过将flatMap更改为concatMap并添加RetryDelayStrategy类来改进了kjones的答案。flatMap不保留发射顺序,而concatMap保留发射顺序,这对于具有退避延迟的情况非常重要。正如其名称所示,RetryDelayStrategy允许用户从各种生成重试延迟的模式中进行选择,包括退避。代码可在我的GitHub上找到,其中包括以下测试用例:
  1. 第一次尝试成功(无重试)
  2. 重试1次后失败
  3. 尝试重试3次,但第2次成功,因此不重试第3次
  4. 第3次重试成功
请参见setRandomJokes方法。

5

kjones的答案相同,但更新到了最新版本 对于RxJava 2.x版本: ('io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

用法:

// 在现有的observable中添加重试逻辑。 // 最多重试3次,每次延迟2秒。

observable
    .retryWhen(new RetryWithDelay(3, 2000));

3

现在使用 RxJava 版本 1.0+,您可以使用 zipWith 实现带延迟的重试。

kjones 的回答进行修改。

修改后

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}

1

您可以在retryWhen操作符返回的Observable中添加延迟。

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

您可以在此处查看更多示例。 https://github.com/politrons/reactive


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接