我对 RxJava 的
此外,如果是这种情况,那么调度器会出现线程饥饿的情况吗?如果我的计算调度器只有5个线程,但是我有超过5个长时间运行的异步流在处理,是否可能会发生饥饿现象?或者这种情况不太可能发生,因为 RxJava 的特性?
observeOn()
和 subscribeOn()
有些疑惑。我知道它们不能并行处理单个数据流的元素。也就是说,单个数据流的元素只会被投放到一个线程中,是这样吗?下面的测试似乎证实了这一点。我还了解到,为了使单个数据流的元素能够并行处理,必须在调度器上使用 flatMap()
,例如:.flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()))
。此外,如果是这种情况,那么调度器会出现线程饥饿的情况吗?如果我的计算调度器只有5个线程,但是我有超过5个长时间运行的异步流在处理,是否可能会发生饥饿现象?或者这种情况不太可能发生,因为 RxJava 的特性?
public class Test {
public static void main(String[] args) {
Observable<String> airports = Observable.just("ABQ", "HOU",
"PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub1 " + s +
" " + Thread.currentThread().getName()));
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub2 " + s +
" " + Thread.currentThread().getName()));
sleep();
}
private static String stall(String str) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
private static void sleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}