如何在Android应用程序中结合RxJava的Single和Completable Retrofit调用

10

我的当前Android应用程序采用RetrofitRxJava来协调我的网络调用。

我将我的HTTP GET模拟为Single<Response<String>>,POST模拟为Completable

我需要的调用顺序如下:

按顺序调用GET(1)、GET(2)、GET(3)

并行调用POST(1)、POST(2)

当POST(1)和POST(2)都完成时,调用GET(4)。

我有一个部分解决方案。我已经编码了前三个GET的调用,然后是POST调用。

我的代码类似于:

// First we make the 3 GET calls sequentially Single> call1 = apiService.getData1(); Single> call2 = apiService.getData2(); Single> call3 = apiService.getData3();

// Once they are all done, then fire off the 2 POSTs in parallel Completable.mergeArray(apiService.postData1(), apiService.postData2()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(() -> { // *** Here is where I need to add the final GET *** });

Single.concat(getRequests())
                .subscribeOn(Schedulers.single())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        manageExecutions(combineExecutions());
                    }
                })
                .subscribe();

    /**
     * @return
     */
    private static Iterable<Single<Response<String>>> getRequests() {
        final API_CALL_GET[] apiCalls = API_CALL_GET.values();
        final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_GET apiCall : apiCalls) {
            requests.add(apiCall.request());
        }

        return requests;
    }

public enum API_CALL_GET {

    GET_ONE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getOne(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataOne)
                .doOnError(error -> ever(error));
        }
    }, GET_TWO {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getTwo(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataTwo)
                .doOnError(error -> ever(error));
        }
    },
    GET_THREE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getThree(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataThree)
                .doOnError(error -> ever(error));
        }
    };

    public abstract Single<Response<String>> request();

}


    private static Action manageExecutions(final List<Completable> completables) {

        return new Action() {
            @Override
            public void run() throws Exception {
                Completable
                .concat(completables)
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        accumulateAmounts();
                    }
                })
                .subscribe();
            }
        };
    }


    /**
     * @return
     */
    private static List<Completable> combineExecutions() {
        final API_CALL_POST[] apiCalls = API_CALL_POST.values();
        final List<Completable> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_POST apiCall : apiCalls) {
            requests.addAll(apiCall.requests());
        }

        return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
    }

public enum API_CALL_POST {

    POST_ONE {
        @Override
        public List<Completable> requests() {
            return NetworkController.postRecommenderExecutions();
        }
    },
    POST_TWO {
        @Override
        public List<Completable> requests() {
            return NetworkController.postSavedSearcheExecutions();
        }
    };

    public abstract List<Completable> requests();

}


    public static List<Completable> postONE() {
        final List<Completable> completables = new ArrayList<>();

        final List<OneDO> oneDOS = fetchOnes();

        for (final OneDO oneDO : oneDOS) {
            completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }




    public static List<Completable> postTWO() {
        final List<Completable> completables = new ArrayList<>();

        final List<TwoDO> twoDOS = fetchTwos();

        for (final TwoDO twoDO : twoDOS) {
            completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }
我遇到困难的是正确地连接我的调用,例如,我认为我可以开发出类似于这个伪代码的解决方案:Single.concat(GET_1... GET_N).onComplete(POST_1... POST_N).onComplete(GET_LAST)。然而,我的目前的部分解决方案只调用第一组GET,然后是POST,而且GET和POST的调用没有"链式"链接。我无法看出如何创建支持我的用例的调用链。是否可以在链式调用中结合 Single -> Completable -> Single?基于Daniil的回答,我最终得到了这个解决方案:-
 Single.concat(getRequests())
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
                .ignoreElements()
                .andThen(Completable.merge(combineExecutions()))
                .doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
                .doOnComplete(() -> Controller.accumulateTotals())
                .subscribe();

2
为什么你不能使用下面答案中提到的运算符?如果你想链接可观察对象并在以下可观察对象中使用每个可观察对象的结果,那么有很多运算符可供选择。例如:map(result =>将其用于另一个函数)。这是你要找的吗? - Mr.O
1
我要等到月底回到工作岗位才能尝试任何东西。此外,我没有使用任何单个或可完成项的结果的要求,我只需要知道每个项目都已成功完成。 - Hector
2个回答

10

在 Kotlin 中,它看起来会像这样:

fun generateGetRequests(): List<Single<Response<String>>> {
    return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}

fun generatePostRequests(): List<Completable> {
    return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}

fun doSomethingWithResponses(responses: Array<Any>) {
    // Do Something, like save to db
}

fun runRequests() {
    Single.zip(generateGetRequests(), { responses ->
        doSomethingWithResponses(responses)
    }).ignoreElements()
        .andThen(Completable.merge(generatePostRequests()))
        .subscribeOn(Schedulers.io())
        .subscribe()
}

1
这已经接近我所需的了。我不得不用Single.concat()替换Single.zip(),以便按预期执行所有GET请求。当我使用zip()时,列表中最后一个GET请求永远不会被执行。我还必须将.toCompletable()更改为.ignoreElements(),因为toCompletable已被弃用。 - Hector

4

不同类型的链式操作可以通过将它们转换为共享的响应式类型(例如Observable)并连接,或者使用其中一种续集方法,如flatMapXandThen

 someSingle
 .flatMapCompletable(result1 -> {
     sideEffect(result1);
     return someCompletable;
 })
 .andThen(Single.defer(() -> {
     sideEffectAfterCompletable();
     return someOtherSingle;
 }))
 ...

1
我正在寻找一个解决方案,可以接受任意数量的Single和Completable;如果依赖于"andThen",那么是否意味着我必须将单个和可完成项的数量与"andThen"的出现次数匹配? - Hector
3
单个值具有您可能要解决的价值。将它们混合在一起需要额外的工作,尤其是当它们具有不同的类型时。如果将"Completable"与其混合,还需决定如何处理它们的任何副作用(如果有的话)。 - akarnokd
在这种情况下,我所有的单个返回一个Json负载,我将其“createAll”到我的Realm本地数据库中。我所有的completable都会发布一条消息并且不会收到响应,因此从高层次来看,我的多个单个和completable在某种程度上是相同的。 - Hector

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接