RxJava:Observable和默认线程

10

我有以下代码:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

这是输出结果:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

重要细节: 我在另一个线程(在Android中是main线程)中调用了subscribe方法。

看起来Observable类默认是同步的,并且在发出事件(如s.onNext)的同一个线程上执行所有操作符(例如map)和通知订阅者的操作,对吗?我在想...这是预期行为还是我误解了什么?实际上,我希望至少onNextonComplete回调函数会在调用方的线程上调用,而不是在发出事件的线程上调用。我理解得正确吗?在这种特定情况下,实际的调用方线程并不重要吗?至少在异步生成事件时如此。

另一个问题是——如果我从某个外部源(即我自己没有生成它)接收到一些Observable作为参数...作为其用户,我没有办法检查它是同步还是异步的,我只能通过subscribeOnobserveOn方法明确指定我想要在哪里接收回调,对吗?

谢谢!

1个回答

12
RxJava在并发方面没有明确的意见。如果您不使用任何其他机制(如observeOn / subscribeOn),它将在订阅线程上生成值。请不要在操作符中使用低级构造(如Thread),否则可能会破坏协议。
由于使用了Thread,onNext将从调用线程(“background-thread-1”)调用。订阅发生在调用(UI-Thread)上。链中的每个操作符都将从'background-thread-1'-calling-Thread调用。订阅onNext也将从'background-thread-1'调用。
如果您想在非调用线程上生成值,请使用:subscribeOn。如果您想将线程切换回主线程,请在链中的某个位置使用observeOn。最好在订阅之前使用。
示例:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
            .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
            .map(integer -> integer) // map happens on Computational-Threads
            .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
            .subscribe(integer -> {
                // called from mainThread
            });

以下是一篇很好的解释。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接