我正在学习Rx-java和Rxandroid2,不太清楚SubscribeOn和 ObserveOn之间的主要区别。
我正在学习Rx-java和Rxandroid2,不太清楚SubscribeOn和 ObserveOn之间的主要区别。
SubscribeOn用于指定一个Observable将要操作的Scheduler。 ObserveOn用于指定观察者将要观察这个Observable的Scheduler。
因此,基本上SubscribeOn大多是在后台线程中订阅(执行)(您不希望在等待Observable时阻塞UI线程),而且在ObserveOn中,您想要在主线程上观察结果...
如果您熟悉AsyncTask,则SubscribeOn类似于doInBackground方法,而ObserveOn类似于onPostExecute...
假如你发现上述答案充斥着术语:
简而言之
Observable.just("Some string")
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)
观察一个Observable...在一个 IO 线程中执行map函数(因为我们正在“订阅”那个线程)...现在切换到一个计算线程并执行
map(length -> 2 * length)
函数... 最后确保您在 (observeOn()
) 主线程上观察输出。
不管怎样,
observeOn()
只是改变下游的所有操作符的线程。人们通常有这样一个误解,即observeOn
也作用于上游,但实际上它不是这样的。
下面的示例将更好地解释它...
Observable.just("Some string") // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---) // Computation
subscribeOn()
只会影响 Observable 订阅时所使用的线程,并且它将一直保持在这个线程下游。
Observable.just("Some String") // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number)); // Computation
位置无关紧要 (
subscribeOn()
)
为什么?因为它只会影响订阅的时间。
遵循与
subscribeOn
相关的方法
-> 基本示例: Observable.create
create
内指定的所有任务都会在 subscribeOn
中指定的线程上运行。
另一个示例:Observable.just
、Observable.from
或 Observable.range
注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。
如果你想使用阻塞函数,请使用
Observable.defer(() -> Obervable.just(blockingMenthod())));
重要事实:
subscribeOn
不能与 Subjects
一起使用。
多个
subscribeOn
:
如果在流中有多个 subscribeOn
实例,则只有第一个实例具有实际效果。
订阅和
subscribeOn
人们认为 subscribeOn
与 Observable.subscribe
有关,但它与此无关。它只影响订阅阶段。
来源:Tomek Polański(Medium)
subscribeOn
调用,则subscribeOn
的位置很重要。但是,对于单个调用,位置并不重要。 - Daksh GargasAndroidSchedulers.main()
之外的任何线程中执行任务,并将结果带回主线程以更新UI。 - Daksh GargasObservable.defer()
,它与阻塞主线程无关,这部分取决于您使用的 Scheduler
。.defer()
唯一的作用是在观察者订阅之前不创建 Observable,并为每个观察者创建一个新的 Observable。 - Daksh GargasSchedulers.io()
上执行任何 UI 任务,你必须切换到 main 线程。否则会抛出异常! - Daksh GargasobserveOn
方法来设置回调函数的线程,即“下游”的代码块,例如在doOnNext
或map
中。subscribeOn
方法来设置初始化的线程,即“上游”的代码块,例如doOnSubscribe
、Observable.just
或Observable.create
。让我们通过一个示例来学习这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事情,因此在后台线程中执行这项密集运算是很自然的,以避免冻结应用程序。
我们可以随意调用observeOn
方法,并控制所有下游回调函数所在的线程。它非常易于使用,而且像你预期的那样工作。
例如,我们将在主UI线程上显示进度条,然后在另一个线程上执行密集/阻塞操作,最后回到主UI线程更新结果:
Observable.just("user1032613")
.observeOn(mainThread) // set thread for operation 1
.doOnNext {
/* operation 1 */
print("display progress bar")
progressBar.visibility = View.VISIBLE
}
.observeOn(backThread) // set thread for operation 2 and 3
.map {
/* operation 2 */
print("calculating")
Thread.sleep(5000)
it.length
}
.doOnNext {
/* operation 3 */
print("finished calculating")
}
.observeOn(mainThread) // set thread for operation 4
.doOnNext {
/* operation 4 */
print("hide progress bar and display result")
progressBar.visibility = View.GONE
resultTextView.text = "There're $it characters!"
}
.subscribe()
在上面的例子中,/* operation 1 */
在mainThread
上运行,因为我们在它上面的那一行使用了observeOn(mainThread)
进行设置;然后我们再次调用observeOn
切换到backThread
,因此/* operation 2 */
将在那里运行。因为我们在链接/* operation 3 */
之前没有更改它,它也会像/* operation 2 */
一样在后台线程中运行;最后,我们再次调用observeOn(mainThread)
,以确保/* operation 4 */
从主线程更新UI。
observeOn
设置后续回调的线程。还有什么我们忽略了吗?嗯,Observable
本身以及其方法,如just()
、create()
、subscribe()
等,也是需要执行的代码。这是对象沿着流传递的方式。我们使用subscribeOn
为与Observable
相关的代码设置线程。observeOn
控制),我们就剩下了“骨架代码”,默认情况下它将在编写代码的任何线程(可能是主线程)上运行: Observable.just("user1032613")
.observeOn(mainThread)
.doOnNext {
}
.observeOn(backThread)
.map {
}
.doOnNext {
}
.observeOn(mainThread)
.doOnNext {
}
.subscribe()
如果我们不满意这个空的主线程代码,我们可以使用subscribeOn
来改变它。例如,第一行Observable.just("user1032613")
可能不仅仅是从我的用户名创建一个流——它可能是从互联网上获得的字符串,或者您正在使用doOnSubscribe
进行其他繁重的操作。在这种情况下,您可以调用subscribeOn(backThread)
将一些代码放到另一个线程中。
subscribeOn
放在哪里?在撰写本答案时,有一些误解认为“只调用一次”、“位置不重要”和“如果多次调用,只有第一次计数”。经过大量研究和实验,发现subscribeOn
可以有用地被多次调用。
因为Observable
使用建造者模式(一种将方法链接在一起的花哨名称),subscribeOn
是反向应用的。因此,这个方法设置了它上面的代码的线程,完全相反于observeOn
。
我们可以使用doOnSubscribe
方法来进行实验。这个方法会在订阅事件上触发,并且它在subscribeOn
设置的线程上运行:
Observable.just("user1032613")
.doOnSubscribe {
print("#3 running on main thread")
}
.subscribeOn(mainThread) // set thread for #3 and just()
.doOnNext {
}
.map {
}
.doOnSubscribe {
print("#2 running on back thread")
}
.doOnNext {
}
.subscribeOn(backThread) // set thread for #2 above
.doOnNext {
}
.doOnSubscribe {
print("#1 running on default thread")
}
.subscribe()
如果您按照从下到上的方式阅读上述示例,就像Builder模式执行代码一样,那么遵循逻辑可能会更容易。
在这个例子中,第一行Observable.just("user1032613")
与print("#3")
在同一线程中运行,因为它们之间没有更多的subscribeOn
。这为那些只关心just()
或create()
内部代码的人创造了“只有第一个调用重要”的幻觉。这种情况很快就会崩溃,一旦您开始做更多的事情。
为了简洁起见,在示例中定义了线程和print()
函数:
val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
subscribeOn()
只能工作一次"。看了你的解释,似乎确实有道理。我可以借用一下你的解释来更新我的回答吗? - Daksh GargasobserveOn
控制它下面的代码,而 subscribeOn
控制它上面的代码,那么如果一个代码块在其上方有 observeOn
,在其下方有 subscribeOn
,它将遵循哪个线程?抱歉,我有点困惑。 - MonsubscribeOn
设置框架“骨架”本身的线程,而 observeOn
设置回调函数/代码块的线程。它们不控制相同的内容。在您的情况下,“代码块”在 observeOn
设置的任何线程上运行。 - WSBTsubscribe()
中的代码,并解释它运行在哪个线程上以及subscribeOn
和observeOn
如何影响它? - CACuzcatlan如果有人发现rx java的描述很难理解(比如我),这里提供了纯java的解释:
Observable.just("something")
.subscribeOn(Schedulers.newThread())
.subscribe(...);
等同于:
Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();
由于Observable
在subscribe()
时发出值,在这里subscribe()
进入单独线程,因此值也在与subscribe()
相同的线程中发出。这就是为什么它可以在"上游"(影响之前操作的线程)和"下游"工作的原因。
observeOn()
Observable.just("something")
.observeOn(Schedulers.newThread())
.subscribe(...);
等同于:
Observable observable = Observable.just("something")
.subscribe(it -> new Thread(() -> ...).start());
这里Observable
在主线程中发出值,只有监听方法在单独的线程中执行。
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CountDownLatch;
public class SubscribeVsObserveOn {
public static void main(String[] args) throws InterruptedException {
System.out.println("Ordinal 0: " + Thread.currentThread().getName());
final CountDownLatch latch = new CountDownLatch(1);
Observable
.just("a regular string.")
.doOnSubscribe(disposable ->
System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
.map(s -> s)
.doOnSubscribe(disposable ->
System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
.map(s -> s)
.subscribe(s -> latch.countDown());
latch.await();
}
}
这是输出结果:
Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4
Let's assume that we have two threads.
val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
As the answers described, observeOn
will set Downstream
, and subscribeOn
will set Upstream
. But what if both of them was used? For check this, I added logs line by line.
Observable.just("what if use both")
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map A " + Thread.currentThread().name)
it + " A"
}
// observeOn
.observeOn(Schedulers.from(pool1))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map B " + Thread.currentThread().name)
it + " B"
}
// subscribeOn
.subscribeOn(Schedulers.from(pool2))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map C " + Thread.currentThread().name)
it + " C"
}
// observeOn main
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
.subscribe(
{ result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
, { error -> {} }
)
both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2
both, doOnNext A Thread 2
both, map A Thread 2
both, doOnNext B Thread 1
both, map B Thread 1
both, doOnNext C Thread 1
both, map C Thread 1
main main
main subscribe main
result: what if use both A B C
如您所见,doOnSubscribe
首先被调用,从下到上。这意味着 subscribe
操作符优先于其他操作符,因此处理第一行代码的线程是线程 2。
然后其他操作符被逐行调用。在 observeOn
之后,线程被更改为线程 1。然后,在调用subscribe
之前,再次调用observeOn
,以更改线程为主线程。(不用关心 AndroidSchedulers,它只是一种调度器)
简而言之:
subscribeOn
被首先从下到上调用。observeOn
与其他代码一起从上到下被调用。