RxAndroid:SubscribeOn 和 ObserveOn 有什么区别?

104

我正在学习Rx-java和Rxandroid2,不太清楚SubscribeOn和 ObserveOn之间的主要区别。

6个回答

135

SubscribeOn用于指定一个Observable将要操作的Scheduler。 ObserveOn用于指定观察者将要观察这个Observable的Scheduler。

因此,基本上SubscribeOn大多是在后台线程中订阅(执行)(您不希望在等待Observable时阻塞UI线程),而且在ObserveOn中,您想要在主线程上观察结果...

如果您熟悉AsyncTask,则SubscribeOn类似于doInBackground方法,而ObserveOn类似于onPostExecute...


1
"一个观察者将会观察这个可观察对象。"听起来像绕口令。 - chris

86

假如你发现上述答案充斥着术语:

简而言之

 Observable.just("Some string")                 
           .map(str -> str.length())              
           .observeOn(Schedulers.computation())   
           .map(length -> 2 * length)   
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.io())
           .subscribe(---)

观察一个Observable...在一个 IO 线程中执行map函数(因为我们正在“订阅”那个线程)...现在切换到一个计算线程并执行map(length -> 2 * length)函数... 最后确保您在 (observeOn()) 主线程上观察输出。

不管怎样

observeOn() 只是改变下游的所有操作符的线程。人们通常有这样一个误解,即observeOn也作用于上游,但实际上它不是这样的。

下面的示例将更好地解释它...

Observable.just("Some string")                  // UI
       .map(str -> str.length())               // UI
       .observeOn(Schedulers.computation())   // Changing the thread
       .map(length -> 2 * length)            // Computation
       .subscribe(---)                       // Computation

subscribeOn() 只会影响 Observable 订阅时所使用的线程,并且它将一直保持在这个线程下游。

Observable.just("Some String")              // Computation
  .map(str -> str.length())                // Computation
  .map(length -> 2 * length)              // Computation
  .subscribeOn(Schedulers.computation()) // -- changing the thread
  .subscribe(number -> Log.d("", "Number " + number)); // Computation

位置无关紧要 (subscribeOn())

为什么?因为它只会影响订阅的时间。

遵循与 subscribeOn 相关的方法

-> 基本示例: Observable.create

create 内指定的所有任务都会在 subscribeOn 中指定的线程上运行。

另一个示例:Observable.justObservable.fromObservable.range

注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。

如果你想使用阻塞函数,请使用

Observable.defer(() -> Obervable.just(blockingMenthod())));

重要事实:

subscribeOn 不能与 Subjects 一起使用。

多个 subscribeOn

如果在流中有多个 subscribeOn 实例,则只有第一个实例具有实际效果。

订阅和 subscribeOn

人们认为 subscribeOnObservable.subscribe 有关,但它与此无关。它只影响订阅阶段。

来源:Tomek Polański(Medium


5
两个答案都有一定的正确性,问题在于如果使用了多个subscribeOn调用,则subscribeOn的位置很重要。但是,对于单个调用,位置并不重要。 - Daksh Gargas
如果我在subscribe()内部进行一些UI工作,会阻塞我的UI吗?我使用了subscribeOn(Schedulers.io()),但我不确定。另外,Observable.defer()会阻塞主线程吗? - Hilal
1
@Hilal,为了执行UI任务,你必须切换到主线程。如果你在主线程上执行严重的计算,那么它会冻结UI,这就是为什么我们通常在除了AndroidSchedulers.main()之外的任何线程中执行任务,并将结果带回主线程以更新UI。 - Daksh Gargas
1
@Hilal 谈到 Observable.defer(),它与阻塞主线程无关,这部分取决于您使用的 Scheduler.defer() 唯一的作用是在观察者订阅之前不创建 Observable,并为每个观察者创建一个新的 Observable。 - Daksh Gargas
@Hilal,你无法在 Schedulers.io() 上执行任何 UI 任务,你必须切换到 main 线程。否则会抛出异常! - Daksh Gargas
显示剩余3条评论

72

总结

  • 使用observeOn方法来设置回调函数的线程,即“下游”的代码块,例如在doOnNextmap中。
  • 使用subscribeOn方法来设置初始化的线程,即“上游”的代码块,例如doOnSubscribeObservable.justObservable.create
  • 这两种方法都可以被多次调用,每次调用都会覆盖之前的设置。 位置很重要。

让我们通过一个示例来学习这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事情,因此在后台线程中执行这项密集运算是很自然的,以避免冻结应用程序。

observeOn

我们可以随意调用observeOn方法,并控制所有下游回调函数所在的线程。它非常易于使用,而且像你预期的那样工作。

例如,我们将在主UI线程上显示进度条,然后在另一个线程上执行密集/阻塞操作,最后回到主UI线程更新结果:

    Observable.just("user1032613")

            .observeOn(mainThread) // set thread for operation 1
            .doOnNext {
                /* operation 1 */
                print("display progress bar")
                progressBar.visibility = View.VISIBLE
            }

            .observeOn(backThread) // set thread for operation 2 and 3
            .map {
                /* operation 2 */
                print("calculating")
                Thread.sleep(5000)
                it.length
            }

            .doOnNext {
                /* operation 3 */
                print("finished calculating")
            }

            .observeOn(mainThread) // set thread for operation 4
            .doOnNext {
                /* operation 4 */
                print("hide progress bar and display result")
                progressBar.visibility = View.GONE
                resultTextView.text = "There're $it characters!"
            }

            .subscribe()
在上面的例子中,/* operation 1 */mainThread上运行,因为我们在它上面的那一行使用了observeOn(mainThread)进行设置;然后我们再次调用observeOn切换到backThread,因此/* operation 2 */将在那里运行。因为我们在链接/* operation 3 */之前没有更改它,它也会像/* operation 2 */一样在后台线程中运行;最后,我们再次调用observeOn(mainThread),以确保/* operation 4 */从主线程更新UI。

subscribeOn

所以我们学习了observeOn设置后续回调的线程。还有什么我们忽略了吗?嗯,Observable本身以及其方法,如just()create()subscribe()等,也是需要执行的代码。这是对象沿着流传递的方式。我们使用subscribeOn为与Observable相关的代码设置线程。
如果我们删除所有回调(由先前讨论的observeOn控制),我们就剩下了“骨架代码”,默认情况下它将在编写代码的任何线程(可能是主线程)上运行:
    Observable.just("user1032613")
            .observeOn(mainThread)
            .doOnNext {
            }
            .observeOn(backThread)
            .map {
            }
            .doOnNext {
            }
            .observeOn(mainThread)
            .doOnNext {
            }
            .subscribe()

如果我们不满意这个空的主线程代码,我们可以使用subscribeOn来改变它。例如,第一行Observable.just("user1032613")可能不仅仅是从我的用户名创建一个流——它可能是从互联网上获得的字符串,或者您正在使用doOnSubscribe进行其他繁重的操作。在这种情况下,您可以调用subscribeOn(backThread)将一些代码放到另一个线程中。

应该把subscribeOn放在哪里?

在撰写本答案时,有一些误解认为“只调用一次”、“位置不重要”和“如果多次调用,只有第一次计数”。经过大量研究和实验,发现subscribeOn可以有用地被多次调用。

因为Observable使用建造者模式(一种将方法链接在一起的花哨名称),subscribeOn是反向应用的。因此,这个方法设置了它上面的代码的线程,完全相反于observeOn

我们可以使用doOnSubscribe方法来进行实验。这个方法会在订阅事件上触发,并且它在subscribeOn设置的线程上运行:

    Observable.just("user1032613")
            .doOnSubscribe {
                print("#3 running on main thread")
            }
            .subscribeOn(mainThread) // set thread for #3 and just()
            .doOnNext {
            }
            .map {
            }
            .doOnSubscribe {
                print("#2 running on back thread")
            }
            .doOnNext {
            }
            .subscribeOn(backThread) // set thread for #2 above
            .doOnNext {
            }
            .doOnSubscribe {
                print("#1 running on default thread")
            }
            .subscribe()

如果您按照从下到上的方式阅读上述示例,就像Builder模式执行代码一样,那么遵循逻辑可能会更容易。

在这个例子中,第一行Observable.just("user1032613")print("#3")在同一线程中运行,因为它们之间没有更多的subscribeOn。这为那些只关心just()create()内部代码的人创造了“只有第一个调用重要”的幻觉。这种情况很快就会崩溃,一旦您开始做更多的事情。


脚注:

为了简洁起见,在示例中定义了线程和print()函数:

val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")

我是在我的回答中提到了"subscribeOn()只能工作一次"。看了你的解释,似乎确实有道理。我可以借用一下你的解释来更新我的回答吗? - Daksh Gargas
谢谢。一个后续问题,如果 observeOn 控制它下面的代码,而 subscribeOn 控制它上面的代码,那么如果一个代码块在其上方有 observeOn,在其下方有 subscribeOn,它将遵循哪个线程?抱歉,我有点困惑。 - Mon
@Mon subscribeOn 设置框架“骨架”本身的线程,而 observeOn 设置回调函数/代码块的线程。它们不控制相同的内容。在您的情况下,“代码块”在 observeOn 设置的任何线程上运行。 - WSBT
非常好的回复,附带示例代码。为了完整起见,您能否添加subscribe()中的代码,并解释它运行在哪个线程上以及subscribeOnobserveOn如何影响它? - CACuzcatlan
这不是我在RxJava 3中看到的行为。 - elonderin

22

如果有人发现rx java的描述很难理解(比如我),这里提供了纯java的解释:

subscribeOn()

Observable.just("something")
  .subscribeOn(Schedulers.newThread())
  .subscribe(...);

等同于:

Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();

由于Observablesubscribe()时发出值,在这里subscribe()进入单独线程,因此值也在与subscribe()相同的线程中发出。这就是为什么它可以在"上游"(影响之前操作的线程)和"下游"工作的原因。

observeOn()

Observable.just("something")
  .observeOn(Schedulers.newThread())
  .subscribe(...);

等同于:

Observable observable = Observable.just("something")
  .subscribe(it -> new Thread(() -> ...).start());

这里Observable在主线程中发出值,只有监听方法在单独的线程中执行。


4
当您订阅一个observable时,一个流程开始工作,它沿着链路往上走,然后再往下走。subscribe部分与向上链接相关,而observe部分与向下链接相关。 一旦到达链的顶部,订阅阶段基本上已经完成。事件开始被发射,然后调用下游链路中的maps、filters等。
SubscribeOn会影响其放置位置以上的订阅调用,例如doOnSubscribe。
ObserveOn会影响其放置位置以下的观察调用,例如doOnNext、map、flatmap等。
两者都会改变用于继续向上或向下流动的线程。
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class SubscribeVsObserveOn {

    public static void main(String[] args) throws InterruptedException {

        System.out.println("Ordinal 0: " + Thread.currentThread().getName());

        final CountDownLatch latch = new CountDownLatch(1);

        Observable
            .just("a regular string.")
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
            .map(s -> s)
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
            .map(s -> s)
            .subscribe(s -> latch.countDown());

        latch.await();
    }
}

这是输出结果:

Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4

3
这个答案并没有新内容,我只是想再澄清一点。
  1. Let's assume that we have two threads.

     val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
     val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
    

  1. As the answers described, observeOn will set Downstream, and subscribeOn will set Upstream. But what if both of them was used? For check this, I added logs line by line.

    Observable.just("what if use both")
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map A " + Thread.currentThread().name)
         it + " A"
     }
    
     // observeOn
     .observeOn(Schedulers.from(pool1)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map B " + Thread.currentThread().name)
         it + " B"
     }
    
     // subscribeOn
     .subscribeOn(Schedulers.from(pool2)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
     .map {
        Log.d("Thread", "both, map C " + Thread.currentThread().name)
        it + " C"
      }
    
     // observeOn main
     .observeOn(AndroidSchedulers.mainThread())
     .doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
     .subscribe(
         { result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
         , { error -> {} }
         )
    
结果是:
both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2

both, doOnNext A Thread 2
both, map A Thread 2

both, doOnNext B Thread 1
both, map B Thread 1

both, doOnNext C Thread 1
both, map C Thread 1

main main
main subscribe main
result: what if use both A B C

如您所见,doOnSubscribe 首先被调用,从下到上。这意味着 subscribe 操作符优先于其他操作符,因此处理第一行代码的线程是线程 2

然后其他操作符被逐行调用。在 observeOn 之后,线程被更改为线程 1。然后,在调用subscribe之前,再次调用observeOn,以更改线程为主线程。(不用关心 AndroidSchedulers,它只是一种调度器)

简而言之:

  • 第一条路径中,subscribeOn 被首先从下到上调用。
  • 第二条路径中,observeOn 与其他代码一起从上到下被调用。
  • 在 RxJava2 和 RxJava3 中表现相同。

“doOnSubscribe”首先被调用,从下到上。你展示的“doOnSubscribe”输出是“C->A->B”。但如果“从下到上”是准确的,那么输出应该是“C->B->A”,因为这是代码中从下到上的顺序。 - user167019

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接