RxJava调度器-线程行为和饥饿问题?

3
我对 RxJava 的 observeOn()subscribeOn() 有些疑惑。我知道它们不能并行处理单个数据流的元素。也就是说,单个数据流的元素只会被投放到一个线程中,是这样吗?下面的测试似乎证实了这一点。我还了解到,为了使单个数据流的元素能够并行处理,必须在调度器上使用 flatMap(),例如:.flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()))
此外,如果是这种情况,那么调度器会出现线程饥饿的情况吗?如果我的计算调度器只有5个线程,但是我有超过5个长时间运行的异步流在处理,是否可能会发生饥饿现象?或者这种情况不太可能发生,因为 RxJava 的特性?
public class Test {
    public static void main(String[] args) {


        Observable<String> airports = Observable.just("ABQ", "HOU", 
            "PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");


        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub1 " + s +
                " " + Thread.currentThread().getName()));

        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub2 " + s +
                " " + Thread.currentThread().getName()));

        sleep();
    }

    private static String stall(String str) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return str;
    }

    private static void sleep() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
1个回答

2
使用flatMap,可能会出现一个异步源被其他源淹没而无法在其自己的源上取得进展的情况。然而,在实践中,由于操作系统和JVM的故障会给予足够的缓冲空间以及由于flatMap本身的背压和仲裁,我还没有见过这种情况发生。如果您担心这种情况,请使用带有maxConcurrent参数的flatMap重载,并限制并发订阅的数量。
RxJava大多数是以非阻塞方式编写的,因此当需要合并或组合源时,它们不会真正等待彼此。
计算调度程序是一组单线程执行器池,并按照轮询方式分配给调用者。我不知道标准执行器的公平性如何。

是的,我猜测 RxJava 的非阻塞性质一定与减轻问题可能性有关。我从未遇到过线程资源方面的问题,但我在推测是否可能存在这种情况。知道这点很好,谢谢! - tmn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接