即使在另一个线程调用了subscribeOn(),Observable仍在主线程上运行。

23

我在其中一个Activity中遇到了奇怪的问题。当从拍照/视频返回时,在我的onActivityResult函数中,我显示一个对话框让用户命名相机名称。一旦用户按下确定,我使用请求的文件名向主题发送onNext(),并复制该文件(并显示进度对话框)。

出于某种原因,即使我调用subscribeOn(Schedulers.io()),执行复制操作的map()函数总是在主线程上调用。

@Override
protected void onActivityResult(final int requestCode, int resultCode, Intent intent) {
    ...

    final PublishSubject<String> subject = PublishSubject.create();`

    mSubscription = subject
            .subscribeOn(Schedulers.io())
            .map(new Func1<String, String>() {
                @Override
                public String call(String fileName) {
                    Log.I.d(TAG,"map");
                    return doSomeIOHeavyFuncition();
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(final String fullPath) {
                    Log.d(TAG,"onNext");
                    doSomethingOnUI(fullPath);

                    subject.onCompleted();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    ...
                }
            }, new Action0() {
                @Override
                public void call() {
                    ...
                }
            });

    final AlertDialog dialog = new AlertDialog.Builder
    ....
    .create()
            .show();

    dialog.getButton(DialogInterface.BUTTON_POSITIVE)
            .setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    String someString = getStringFromDialog(dialog);

                    dialog.dismiss();
                    InputMethodManager imm = (InputMethodManager) getSystemService(Context.INPUT_METHOD_SERVICE);
                    imm.hideSoftInputFromWindow(input.getWindowToken(), 0);

                    showProgressDialog();
                    subject.onNext(someString);
                }
            });
}

subscribeOn(Schedulers.io())调用更改为observeOn(Schedulers.io())解决了问题。但我仍然想知道为什么它之前没起作用...

1个回答

56

subscribeOnobserveOn是最容易混淆的运算符。前者确保订阅端效果在指定的调度器(线程)上发生,但这并不意味着值也会在该线程上出现。

例如,如果您的观察者在订阅时打开网络连接,您不希望该操作在主线程上运行,因此需要使用subscribeOn来指定订阅和因此网络连接将被创建的位置。

当数据最终到达时,发射线程可以是任何东西,其中一个调度程序或后台普通线程。由于我们不知道或不喜欢那个线程,因此我们想将数据的观察移动到另一个线程。这就是observeOn的作用:确保其后的运算符将在指定的调度程序上执行其onNext逻辑。Android开发人员已经使用它将值的观察移回到主线程。

然而,很少有人解释当您想在最终结果再次落在主线程之前从主线程之外进行一些额外的计算时会发生什么:使用多个observeOn运算符:

source
.observeOn(Schedulers.computation())
.map(v -> heavyCalculation(v))
.observeOn(Schedulers.io())
.doOnNext(v -> { saveToDB(v); })
.observeOn(AndroidSchedulers.mainThread())
...

3
对我来说这是新鲜事。使用RxJava一段时间以来,我一直认为subscribeOn线程适用于所有的下游操作符,直到第一个observeOn或指定了调度器的操作符出现。这不是真的吗?那么在subscribeOn之后如何选择线程? - AndroidEx
2
subscribeOn效果向上游靠近事件源。可以这样想,你有一个事件源,但是你想在不同的线程上订阅它。你可以编写 new Thread(() -> source.subscribe(s)).start();subscribeOn中会发生类似的事情。你根据订阅该源时发生的情况选择调度程序。如果它立即打开网络连接或开始读取长文件,则使用Schedulers.io()。如果它开始进行大量计算,则使用computation() - akarnokd
4
谢谢。虽然我更感兴趣的是在subscribeOn之后发生了什么,下一个操作符应用于哪个线程。从你的话中我得到的印象是不能保证它会在subscribeOn指定的线程上执行。 - AndroidEx
@AndroidEx 我认为说这不是保证的并不正确,当回调函数被调用时,线程通常会发生变化(特别是返回到主线程)。由于普通的后台工作程序在主线程上调用它们的回调函数(让我们在其上进行UI工作)。在我们使用rx进行工作时,如果我们使用其中之一,任务在此之后返回到主线程。 - Hossein Shahdoost

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接