如何在Android上使用RxJava在多个线程上运行订阅者

4

我刚接触RxJava,正在为一个(我猜)简单的问题而苦恼。我想要在三个线程中同时处理subscribe部分。因此我使用了FixedThreadPool。以下是示例代码:

Observer.just("one", "two", "three", "four")
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))
.subscribe(new Observer<String>() {

    public void onNext(String string) {
        Log.d(TAG, "Started: " + string);
        SystemClock.sleep(1000);
        Log.d(TAG, "Ended: " + string);
    }

    (...)

}

期望结果:

Started: one
Started: two
Started: three
Ended: one
Started: four
Ended: two
Ended: three
Ended: four

实际结果:

Started: one
Ended: one
Started: two
Ended: two
Started: three
Ended: three
Started: four
Ended: four

我做错了什么?
1个回答

5

RxJava Observables 是顺序执行的,subscribeOnobserveOn 运算符不会并行运行值。

你可以通过使用模数键对值进行分组,通过 observeOn 运算符运行它们,并合并结果来实现最接近的效果:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 100)
.groupBy(v -> count.getAndIncrement() % 3)
.flatMap(g -> g
    .observeOn(Schedulers.computation())
    .map(v ->  Thread.currentThread() + ": " + v))
.toBlocking()
.forEach(System.out::println);

谢谢回答。但是我是否正确理解解决方案“为每个线程设置单独的队列”这一点,因此如果任务所需的时间不同,则在结束时,某些线程可能会比一个线程早完成几个任务。我的问题是,RxJava是否支持在多个线程之间使用共享队列? - reinra
这个设置中没有工作窃取,也没有共享队列。 - akarnokd

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接