RxJava for Android:暴露异常并重试(带延迟)

6

我有一个Observable,每30秒钟就会使用Retrofit执行REST调用:

 Subscription subscription = Observable.interval(0, REFRESH_INTERVAL, TimeUnit.SECONDS)
        .concatMap(new Func1<Long, Observable<Response>>() {
           @Override
           public Observable<Response> call(Long time) {
              return webservice.callRetrofitServiceWithRx(parameter);
           }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new UpdateSuccessAction(), new UpdateErrorAction());

可能会出现异常情况(尤其是REST调用),例如无网络连接。我希望Observable可以发出/暴露异常,以便我可以在UI上显示错误消息,但它应该继续发出项目(30秒后重试)。目前的研究结果如下:如果我没有定义任何特殊行为,Observable会发出异常并停止工作(= 30秒内不重试);如果我使用retry运算符,异常将被吞噬且不会被公开,因此无法在UI中显示错误;如果我使用onErrorReturn运算符,可以处理异常,但据我所知不能重试。我的当前解决方法是重新订阅此Observable,但我想知道是否有更优雅的解决方案。
2个回答

4

我假设doOnError可以满足你的需求(用于记录错误),结合重试使用,例如:

Subscription subscription = Observable.interval(0, REFRESH_INTERVAL, TimeUnit.SECONDS)
    .concatMap(new Func1<Long, Observable<Response>>() {
       @Override
       public Observable<Response> call(Long time) {
          return webservice.callRetrofitServiceWithRx(parameter);
       }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(new UpdateErrorAction())
    .retry()
    .subscribe(new UpdateSuccessAction());

1

在另一个答案的帮助下,我找到了解决方案。

首先,我定义了一个RetryWithDelay函数,它会在30秒后开始重试,而不是立即重试。

 private static class RetryWithDelay
     implements Func1<Observable<? extends Throwable>, Observable<?>> {

  @Override
  public Observable<?> call(Observable<? extends Throwable> attempts) {
     return attempts.flatMap(new Func1<Throwable, Observable<?>>() {
        @Override
        public Observable<?> call(Throwable throwable) {
           return Observable.timer(CallBO.REFRESH_INTERVAL_IN_SEC,             }
     });
  }
}

然后我在这个Observable-Chain中使用了它:
Subscription subscription = Observable.interval(0, REFRESH_INTERVAL, TimeUnit.SECONDS)
.concatMap(new Func1<Long, Observable<Response>>() {
   @Override
   public Observable<Response> call(Long time) {
      return webservice.callRetrofitServiceWithRx(parameter);
   }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(new UpdateErrorAction())
.retryWhen(new RetryWithDelay())
.subscribe(new UpdateSuccessAction());

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接