RxJava调度器总是与sleep在同一个线程中运行

3

我尝试在不同的线程上运行每个计算,但无论我使用哪个调度器,它总是在单个线程上运行。

PublishProcessor processor = PublishProcessor.create();

    processor
        .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
        .observeOn(Schedulers.newThread()).subscribe(i -> {
            System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
            Thread.currentThread().sleep(5000);
        });
    processor.onNext(2);
    processor.onNext(3);
    processor.onNext(4);
    processor.onNext(5);
    processor.onNext(6);


    while (true) {}

输出将是:
2 emitted on 1
3 emitted on 1
4 emitted on 1
5 emitted on 1
6 emitted on 1
2 received on 13
3 received on 13
4 received on 13
5 received on 13
6 received on 13

线程13只有睡眠后才能处理下一个值,但我希望在这种情况下有几个独立的睡眠线程。

请问有人可以解释一下我做错了什么吗?


1
observeOn 只影响订阅者下游。如果要影响处理器工作的位置,需要使用 subscribeOn。 如果需要并行处理,可以使用内置的并行操作符或类似 flatMap(o -> o.subscribeOn(Schedulers.io())) 的方法。更多信息请参见:http://tomstechnicalblog.blogspot.fr/2015/11/rxjava-achieving-parallelization.html - Phoenix Wang
非常感谢,那个链接非常有帮助! - V. Uspensky
1个回答

2

.observeOn(...)通过将项目流切换到另一个线程来产生影响,但始终是同一线程。

如果您想为每个项目创建一个新线程,则可以执行以下操作

processor
    .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
    .flatMap(item -> Observable.just(item)
                         .subscribeOn(Schedulers.newThread()))    // make every item change to a new thread
    .subscribe(i -> {
        System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
        Thread.currentThread().sleep(5000);
    });

它仍在同一线程上运行。也许这是PublishProcessor的行为?“2发出于1 3发出于1 4发出于1 5发出于1 6发出于1 3接收于14 2接收于14 4接收于14 5接收于14 6接收于14” - V. Uspensky
解决方案是让它在.map块中休眠,紧接着.subscribeOn(Schedulers.newThread())。即: .flatMap(item -> Flowable.just(item) .subscribeOn(Schedulers.newThread()) .map(i -> { System.out.println(i.toString()+" received on "+Thread.currentThread().getId()); Thread.currentThread().sleep(5000); return i; })) - V. Uspensky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接