RxJava + Retrofit + 轮询

14

我有一个Retrofit调用,并希望每30秒重新调用它。为此,我使用了Observable.interval(0, 30, TimeUnit.SECONDS)

。该方法可以在指定的时间间隔内重复执行操作。

Observable
    .interval(0, 30, TimeUnit.SECONDS)
    .flatMap(x -> RestApi.instance().getUsers())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(list -> {
                    // ...
               },
               error -> Timber.e(error, "can't load users"));

我的问题:如果API调用失败,将调用onError,并且订阅将取消订阅,轮询不再工作 :-(
为了捕获API错误,我添加了一个retryWhen
Observable
    .interval(0, 30, TimeUnit.SECONDS)
    .flatMap(x -> RestApi.instance().getUsers()
                         .retryWhen(errors -> errors
                             .flatMap(error -> Observable.timer(15, TimeUnit.SECONDS))))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(list -> {
                   // ...
               },
               error -> Timber.e(error, "can't load users"));

这段代码可以捕获错误,但是我会在一段时间内收到多个API调用。每30秒,我会收到一个新的轮询信号,这会导致一个新的API请求。但是如果API请求失败,它会自动重试。因此,我会有一个新的请求加上所有的重试。

我的问题是:如何处理API错误而不取消订阅轮询信号?

4个回答

11

了解如何正确使用retryWhenrepeatWhen,请参考这篇文章

另外,学习如何使用onError操作符,请参考这篇文章

使用Rx非常简单 :) 我不会给你最终的解决方案,只需要玩弄它并尝试理解这里的流程即可。


1
我知道第一篇文章,但不知道第二篇。第二篇描述了一个类似的问题,并提供了解决方案 :-) - Ralph Bergmann

1
如果您希望在请求失败时,getUsers 请求不会进入 onError,请将返回类型从 Observable<yourUserType> getUsers() 更改为 Observable<Response<yourUserType>> getUsers()。这样,您就可以在 Response 对象中拦截网络错误。
此方法仅适用于使用 retrofit 2.x 的情况。

0

您可以使用onErrorResumeNext或onExceptionResumeNext并传递“error”值。 您可以根据需要在此处查找其他错误处理方式。


我不确定这个是否有效。这张图片 http://reactivex.io/documentation/operators/images/onErrorResumeNext.png 看起来在onError之后也调用了onComplete - Ralph Bergmann
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Adam Radomski

-3
你可以使用这段代码,在其中实现数字尝试次数和请求之间的时间延迟。
private static int COUNTER_START = 1;
private static final int ATTEMPTS = 6;
private static final int ORIGINAL_DELAY_IN_SECONDS = 2;

remoteData.getAllRides(idSearch)
            .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                        @Override
                        public Observable<?> call(Observable<? extends Void> observable) {
                            return observable.flatMap(new Func1<Void, Observable<?>>() {
                                @Override
                                public Observable<?> call(Void aVoid) {
                                    if(COUNTER_START > ATTEMPTS){
                                        throw new RuntimeException();
                                    }
                                    COUNTER_START++;
                                    return Observable.timer(ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
                                }
                            });
                        }
                    })
            .takeUntil(new Func1<RideResponse, Boolean>() {
                @Override
                public Boolean call(RideResponse rideResponse) {
                    return rideResponse.getState().equals("finished");//this is the validation finish polling

                }
            }).filter(new Func1<RideResponse, Boolean>() {
                @Override
                public Boolean call(RideResponse rideResponse) {
                    return rideResponse.getState().equals("finished"); //this is the validation finish polling
                }
            }).map(rideResponse -> Log.e("",rideResponse.toString()))
        .doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接