在后台线程上处理可观测流。

23

我在使用RxAndroid进行流操作。在我的实际用例中,我正在从服务器获取列表(使用Retrofit)。我使用调度程序在后台线程上执行工作并在Android UI(主)线程上获取最终发射。

这对于网络调用很有效,但是我意识到我的网络调用后的运算符没有使用后台线程,而是在主线程上被调用。

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});

如何确保所有操作在后台线程上执行?

1个回答

63

TL;DR: 将 observeOn(AndroidSchedulers.mainThread()) 放到 filter(...) 之后。


subscribeOn(...) 用于指定 Observable 开始操作的线程。对 subscribeOn 的后续调用将被忽略。

因此,如果你写了以下代码,所有内容 将在 Schedulers.newThread() 上执行:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> { doSomething(integer1); });

当然,你并不想这样:你想在主线程上进行doSomething操作。
这就是observeOn的作用所在。在observeOn后面的所有操作都会在该调度器上执行。因此,在你的例子中,filter是在主线程上执行的。

相反,将observeOn移动到subscribe之前:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

现在,filter将在“新线程”上执行,而doSomething将在主线程上执行。


为了进一步操作,您可以多次使用 observeOn

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

在这种情况下,获取将在一个新线程上进行,过滤将在一个计算线程上进行,而doSomething将在主线程上进行。

请查看ReactiveX - SubscribeOn operator获取官方文档。


谢谢您详细的回答!是否有一种方法可以在方法调用链的开始处定义调度程序(就像在OP中)? - WonderCsabo
Observables 遵循装饰器模式,因此按设计,这是不可能的。observeOn(以及其他方法)不像构建器模式一样返回相同的 Observable 实例,而是返回一个“包装”“旧”的 Observable”的新Observable`实例。 - nhaarman
如果我想从服务器获取并过滤另一个列表,我还必须添加.observeOn(AndroidSchedulers.mainThread())行。(目前我正在缓存可观察对象,并且我认为第一个observeOn()调用会处理。) - WonderCsabo
1
非常详细和有帮助的回答。谢谢。 - Mike Rapadas
2
非常好的回答,十分感谢! - gor
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接