RxJava retryWhen重新订阅传播

5
我正在使用Retrofit和RxJava在Android应用程序中进行通信,并且必须处理解析看似正常的HTTP响应(状态码为200)时的错误。
我还实现了一种使用retryWhen操作符来处理错误的方法,该操作符连接到用户的输入以决定是否重试。这通过重新订阅原始的Observable来实现。
我尝试的第一种方法是像这样:
services.getSomething()
  .map(response -> {
    if (checkBadResponse(response)) {
      throw new RuntimeException("Error on service");
    } else {
      return parseResponse(response);
    }
  }).retryWhen(this::shouldRetry);

通过这种方式,服务不会被再次调用。似乎retryWhen操作符无法重新订阅服务的Observable

最终有效的方法是实现另一个操作符,该操作符不将onCompleted向前传递,并像以下方式使用lift

public class CheckResponseStatus<T> implements Observable.Operator<ResponsePayload<T>, ResponsePayload<T>> {
    @Override
    public Subscriber<? super ResponsePayload<T>> call(Subscriber<? super ResponsePayload<T>> subscriber) {
        return new Subscriber<ResponsePayload<T>>() {
            private boolean hasError = false;

            @Override
            public void onCompleted() {
                if (!hasError)
                    subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                hasError = true;
                subscriber.onError(e);
            }

            @Override
            public void onNext(ResponsePayload<T> response) {
                if (response.isOk()) {
                    subscriber.onNext(response);
                } else {
                    hasError = true;
                    subscriber.onError(new RuntimeException(response.getMessage()));
                }
            }
        };
    }
}

使用它的方法:

services.getSomething()
  .lift(new CheckResponseStatus())
  .map(response -> parseResponse(response))
  .retryWhen(this::shouldRetry);

这是正确的处理方式吗?还是有更简单、更好的方法?


1
如果您删除.retryWhen,您的观察者是否会收到onError - Haspemulator
是的,我知道!通常错误会被传播。 - alcarv
“RetryWhen”是一个特别难以理解的操作符,因为你可以通过提供给它的函数来告诉它重新订阅。从它的代码中我能够解读出来的是,你不能忽略传入的observable,而且必须要通过它进行链式调用。 - akarnokd
我已经成功处理了 retryWhen 的逻辑,使用了未在代码中显示的 shouldRetry 方法。问题实际上出现在我尝试通过从传递给 map 的函数中抛出错误来生成错误时。 - alcarv
1个回答

4

看起来是rx-java实现中的一个bug。无论如何,从map函数中抛出异常是不好的,因为该函数应该是纯的(例如没有副作用)。在您的情况下,应该使用flatMap操作符:

services.getSomething()
  .flatMap(response -> {
    if (checkBadResponse(response)) {
      return Observable.<ResponseType>error(new RuntimeException("Error on service"));
    } else {
      return Observable.<ResponseType>just(parseResponse(response);
    }
  }).retryWhen(this::shouldRetry);

上述代码可以正常工作,如果出现错误会进行重试请求。


这确实是一种更简单的方法,而不是实现与lift一起使用的运算符。我会检查一下是否适用于我的情况。我真的没有想到在map内部抛出异常是一种副作用,只是触发RxJava已经实现的错误的一种方式,但这肯定是一个要点。我从其他问题中得到了这个想法。 - alcarv
3
你使用的rxjava版本是哪个?我刚在GitHub上发现了这个问题 https://github.com/ReactiveX/RxJava/pull/2852 看起来在 1.0.9 版本中已经修复了。 - Vladimir Mironov
我之前使用的是1.0.4版本,包含在最新的RxAndroid 0.24.0中。刚刚测试了一下1.0.10版本,可以正常工作!确实是一个bug。另外,使用flatMap的版本在1.0.4上也可以正常工作。谢谢! :) - alcarv
我认为在 map 中使用抛出异常并不是一个坏主意。实际上,我相信这比在示例中使用 flatMap 和堆叠人工 Observables 更能清楚地传达程序员的意图。 - Haspemulator

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接