RxJava对长度不同的Retrofit Observable数组进行zip操作

3

我有一个长度不固定的Observable数组。我想将这些请求合并(即进行一系列API请求并等待它们全部完成),但我无法弄清如何实现zip函数。

Observable.zip(observables, new FuncN<List<ResponseBody>>() {
    @Override
    public List<ResponseBody> call(Object... args) {
        return Arrays.asList(args); <- compile error here
    }
});

这里的 `obserables` 是一个 `List>` 数组,其长度不确定。
在 `zip` 函数中,调用参数无法更正为 `ResponseBody...`。如何使其返回 `Observable>`?
这是否是 `FuncN` 在 RxJava 1.x.x 设计中的限制?
P.S. 我正在使用 RxJava 1.1.6。

问题到底是什么?FuncN方法的返回类型是使用zip方法创建的操作符将发出的项目类型,这意味着您将拥有Observable<List<ResponseBody>>。 - yosriz
1
@yosriz,问题是Arrays.asList(args)不是List<ResponseBody>类型,这导致编译错误。 - chubao
3个回答

3

只需合并您的可观察对象,并使用toList收集结果:

Observable.merge(observables).toList()

请问 merge 操作会阻塞每一个请求吗?我希望同时异步地执行多个请求,以使它们不会相互阻塞。 - chubao
1
merge不会阻塞,但是Observable.concat会。 - Maksim Ostrovidov
我刚试了一下,发现请求是一个接一个地发送的,也就是说只有第一个请求在第二个请求被触发之前才有响应... - chubao
你的意思是你得到了这样的顺序:第一次请求,第一次响应,第二次请求,第二次响应,第三次请求,第三次响应...吗? - Maksim Ostrovidov
是的,当我使用合并时,我会得到这个订单。 - chubao
当订阅和结果传递之间没有延迟时,预计结果会比下一个Observable开始更快地传递。但是,如果您在每个Observable上测试延迟,一切都会就位。 - Maksim Ostrovidov

0
用户合并并确保所有可观察请求都有自己的线程。 你可以尝试这样做:
private void runMyTest() {
    List<Single<String>> singleObservableList = new ArrayList<>();
    singleObservableList.add(getSingleObservable(500, "AAA"));
    singleObservableList.add(getSingleObservable(300, "BBB"));
    singleObservableList.add(getSingleObservable(100, "CCC"));
    Single.merge(singleObservableList)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(System.out::println);
}

private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
    return Single
            .create((SingleOnSubscribe<String>) e -> {
                    try {
                        Thread.sleep(waitMilliSeconds);
                    } catch (InterruptedException exception) {
                        exception.printStackTrace();
                    }
                    System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = " +Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
                    if(!e.isDisposed()) e.onSuccess(name);
                })
            .subscribeOn(Schedulers.io());
}

输出:

System.out: 名称 = CCC,等待毫秒数 = 100,线程名称 = RxCachedThreadScheduler-4,ID = 463

System.out: CCC

System.out: 名称 = BBB,等待毫秒数 = 300,线程名称 = RxCachedThreadScheduler-3,ID = 462

System.out: BBB

System.out: 名称 = AAA,等待毫秒数 = 500,线程名称 = RxCachedThreadScheduler-2,ID = 461

System.out: AAA


0

我确认merge操作符有效,并找到导致顺序的罪魁祸首:第1个请求,第1个响应,第2个请求,第2个响应,第3个请求,第3个响应

observables列表中的可观察对象在添加时未订阅Scheduler.io()

之前:

observables.add(mViewModelDelegate.get().getApiService()
                    .rxGetCategoryBrandList(baseUrl, categoryId));

之后:

observables.add(mViewModelDelegate.get().getApiService()
                    .rxGetCategoryBrandList(baseUrl, categoryId)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接