RxJava:如何使用zip运算符处理错误?

31

我正在使用RxJava和RxAndroid与Retrofit2配合使用。

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

在上述两个观察者上使用如下的zip操作符。
 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
            } 
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() {

        @Override
        public void onNext(ArrayList<TestData> testDataList) {

        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted" );
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError Throwable: " + t.toString() );
        }
    });

如果在responseOneObservableresponseTwoObservable中的任何一个出现错误,则会直接调用testDataObservable的订阅者的onError方法。

即使两个可观察对象中的任何一个成功响应,我也希望继续在zip运算符的call方法中进行。

如何使用zip运算符处理错误响应?


我相信 onErrorResumeNext 应该可以让你做到这一点。 - John O'Reilly
你知道如果其中一个Observable返回NULL该怎么办吗?https://stackoverflow.com/questions/50334430/rxjava-group-two-responses-one-of-which-might-be-null-with-zip-operator - yozhik
3个回答

45

你可以使用onErrorResumeNext返回一些Observable,或者使用onErrorReturn返回一些默认值给zip,例如:

Observable.zip(
   responseOneObservable
       .onErrorReturn(new Func1<Throwable, ResponseOne>() {
        @Override
        public ResponseOne call(final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Func1<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo call(final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

更多信息请见错误处理运算符


更新: 在RxJava 2.0中,您必须使用Function而不是Func1

import io.reactivex.functions.Function;
...
Observable.zip(
   responseOneObservable
       .onErrorReturn(new Function<Throwable, ResponseOne>() {
        @Override
        public ResponseOne apply(@NonNull final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Function<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo apply(@NonNull final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

或者使用Lambda表达式:

Observable.zip(
   responseOneObservable
       .onErrorReturn(throwable -> new ResponseOne()),
   responseTwoObservable
       .onErrorReturn(throwable -> new ResponseTwo()),
   ...

或者使用 Kotlin:

Observable.zip(
   responseOneObservable
       .onErrorReturn { ResponseOne() },
   responseTwoObservable
       .onErrorReturn { ResponseTwo() },
   ...

在RxJava 2.0中,Func1已被Function所取代。参考链接:https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0 - Ehsan Khaveh

4
你可以使用onErrorResumeNext运算符从两个可观测对象中返回默认响应。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

请参阅Dan Lew的RxJava中的错误处理


0

在单个压缩的可观察对象上使用onErrorResumeNext指令,以便在出现错误时发出默认项。

请参阅错误处理运算符


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接