RxJava:如何在 flatMap 操作符中从错误中恢复

3

我有一个EditText,用户在其中输入搜索查询词,当用户输入内容时,我想在我的服务器上执行即时搜索。

我尝试使用RxJava实现如下:

RxTextView.textChanges(editQuery) // I'm using RxBinding for listening to text changes
    .flatMap(new Func1<CharSequence, Observable<UserPublic[]>>() {
        @Override
        public Observable<UserPublic[]> call(CharSequence query) {
            return api.searchUsers(query); // I'm using Retrofit 1.9 for network calls. searchUsers returns an Observable<UserPublic[]>
        }
    })
    .subscribe(Observers.create(
        new Action1<UserPublic[]>() {
            @Override
            public void call(UserPublic[] userPublics) {
                processResult(userPublics);
            }
        })
        , new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                processError(throwable);
            }
    });

问题在于,如果网络调用遇到错误,整个可观察对象都会停止。因此,当用户继续输入时,什么也不会发生。

我该如何修改这段代码,以便:

  1. 每当出现网络问题时,就调用 processError
  2. 但是当用户继续输入时,新的网络调用将继续发出(导致再次出现 processResult/processError
2个回答

3
请在订阅之前使用retryWhen()操作符添加到可观察序列中。注意,retryWhen()的参数是一个函数,它接受一个Observable<Throwable>并返回一个Observable<?>。返回的类型不重要,因为该操作符使用onNext()结果来发起重试,并使用onError()onCompleted()结果来终止链。

下面是一个简单的应用程序,它等待5秒钟然后重试:

observable
  .retryWhen( errorObservable -> errorObservable.delay( 5, TimeUnit.SECONDS ) )
  .subscribe();

这里有一个更加成熟的操作,在超时时进行重试,如果发生IOException则失败:

observable
  .retryWhen( errorObservable -> errorObservable.flatMap( throwable -> { // (1)
       if ( throwable instanceof IOException ) {
         return Observable.error( throwable ); // (2)
       }
       return Observable.just(1); // (3)
     } )
  .subscribe();
  1. 使用 flatMap() 可以使你推迟决策,直到知道你要处理的是哪种错误。
  2. 返回的 observable 抛出提供的错误或其他你想更好地描述问题的内容。
  3. 提供一个仅执行 onNext() 的 observable 告诉 retryWhen() 操作符重新订阅原始 observable。

0

我建议您查看RxJava文档中的错误处理运算符页面。尝试使用不同的运算符,找到最适合您用例的那个。我认为您应该使用onErrorResumeNext()返回某种默认值,例如返回new UserPublic[0]。在此处调用processError可能会有问题,特别是如果它涉及UI,因为您可能仍在后台线程上进行处理。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接