SubscribeOn
拦截对 IObservable<T>
中的单个方法 Subscribe
的调用以及对Subscribe
返回的IDisposable
句柄上的 Dispose
的调用。
ObserveOn
拦截对IObserver<T>
中的方法 OnNext
, OnCompleted
和 OnError
的调用。
这两种方法都会在指定的调度程序上进行相应的调用。
SubscribeOn
使得observable的Subscribe
方法在指定的调度程序或上下文中异步执行,用于避免从当前线程调用observable的Subscribe
方法导致阻塞。
请注意,SubscribeOn
仅影响应用到其上的observable,而不是整个可观察链。
SubscribeOn
在您调用Subscribe
和订阅的可观察对象之间,拦截调用并使其异步化。它还会影响订阅的处理。 Subscribe
返回一个用于取消订阅的IDisposable
句柄。 SubscribeOn
确保对所提供的调度程序调用Dispose
。
在试图理解SubscribeOn
的作用时,常见的困惑点是可观察对象的Subscribe
处理程序可能在同一线程上调用OnNext
、OnCompleted
或OnError
。然而,它的目的不是影响这些调用。流在Subscribe
方法返回之前完成是很常见的情况。例如,Observable.Return
就是这样做的。让我们来看一下。
如果您使用我编写的Spy方法,并运行以下代码:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
你会得到如下输出(线程 ID 可能会有所不同):
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
您可以看到整个订阅处理程序在同一线程上运行,并在返回之前完成。
让我们使用SubscribeOn
来异步运行它。我们将同时对Return
可观察对象和SubscribeOn
可观察对象进行监听:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
这会输出以下内容(我添加了行号):
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
01-主方法在线程1上运行。
02-Return
可观察对象在调用线程上进行评估。我们只是在这里获取IObservable
,尚未订阅任何内容。
03-SubscribeOn
可观察对象在调用线程上进行评估。
04-现在最后我们调用SubscribeOn
的Subscribe
方法。
05-Subscribe
方法异步完成...
06-...然后线程1返回到主方法。这就是SubscribeOn的作用!
07-与此同时,SubscribeOn
在默认调度程序上安排了对Return
的调用。 这里在线程2上接收到它。
08-并且像Return
一样,它在Subscribe
线程上调用OnNext
...
09-而SubscribeOn
现在只是一个通过。
10,11-同理OnCompleted
12-最后Return
订阅处理程序完成。
希望这可以解决SubscribeOn
的目的和效果!
ObserveOn
如果您认为 SubscribeOn
是拦截器的 Subscribe
方法,该拦截器将调用传递到不同的线程,则 ObserveOn
执行相同的工作,但对于 OnNext
, OnCompleted
和 OnError
调用。
回想一下我们的原始示例:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
这导致了以下输出:
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
现在让我们改用ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
我们得到以下输出:
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
01 - 主方法运行在线程1上。
02 - 与以前一样,Return
可观察对象在调用线程上进行评估。 我们只是在这里获取IObservable
,还没有订阅任何内容。
03 - ObserveOn
可观察对象也在调用线程上进行评估。
04 - 现在我们订阅了ObserveOn
可观察对象,同样在调用线程上进行,然后...
05 - ...然后将调用传递给Return
可观察对象。
06 - 现在,Return
在其Subscribe
处理程序中调用OnNext
。
07 - 这是ObserveOn
的效果。我们可以看到OnNext
被异步地计划在线程2上。
08 - 同时,Return
在线程1上调用了OnCompleted
...
09 - 并且Return
的订阅处理程序完成了...
10 - 随后,ObserveOn
的订阅处理程序也完成了...
11 - 因此,控制权被返回到主方法
12 - 与此同时,ObserveOn
将Return
的OnCompleted
调用传递到线程2。这可能发生在09-11的任何时间,因为它是异步运行的。只是碰巧现在调用了。
典型的使用情况是什么?
在GUI中,您最常见到的是在订阅长时间运行的observable并希望尽快离开调度程序线程时使用SubscribeOn
- 可能是因为您知道它是其中一个observable,它在订阅处理程序中完成所有工作。 将其应用于可观察链的末尾,因为这是订阅时调用的第一个可观察对象。
在GUI中,您最常见到的是在确保将OnNext
,OnCompleted
和OnError
调用驳回到调度程序线程时使用ObserveOn
。 将其应用于可观察链的末尾,尽可能晚地转换回来。
希望您能看出,答案是ObserveOnDispatcher
对Where
和SelectMany
执行的线程不会产生任何影响-这完全取决于stream从哪个线程调用它们! stream的订阅处理程序将在调用线程上调用,但是不知道Where
和SelectMany
将在哪里运行,而不知道stream
如何实现。
超出Subscribe调用寿命的observable
到目前为止,我们一直专注于Observable.Return
。 Return
在Subscribe
处理程序内完成其流。虽然这并不罕见,但流经常超过Subscribe
处理程序的寿命。 例如,请查看Observable.Timer
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
这将返回以下内容:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
您可以清楚地看到订阅完成后,OnNext
和OnCompleted
在另一个线程上被调用。
请注意,任何SubscribeOn
或ObserveOn
的组合都不会对Timer
选择在哪个线程或调度程序上调用OnNext
和OnCompleted
产生任何影响。
当然,您可以使用SubscribeOn
来确定Subscribe
线程:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
(我故意在这里切换到NewThreadScheduler
,以防止出现Timer
恰好获取与SubscribeOn
相同的线程池线程时引起混淆)
给出:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
在这里,您可以清楚地看到主线程在线程 (1) 调用其 Subscribe
后返回,但 Timer
订阅却使用了自己的线程 (2),而 OnNext
和 OnCompleted
调用运行在线程 (3) 上。
现在,对于 ObserveOn
,让我们将代码更改为(对于那些跟随代码进行的人,请使用 nuget 包 rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
这段代码有些不同。第一行确保我们有一个调度程序,而且我们还引入了ObserveOnDispatcher
- 这与ObserveOn
非常相似,只是它指定我们应该使用ObserveOnDispatcher
所评估的线程的DispatcherScheduler
。
这段代码会输出以下内容:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
注意,调度程序(以及主线程)是线程1。 Timer
仍然在它选择的线程(2)上调用OnNext
和OnCompleted
,但ObserveOnDispatcher
将调用封送回调用线程(1)。
还要注意,如果我们阻塞调度程序线程(例如通过Thread.Sleep
),您会发现ObserveOnDispatcher
将被阻塞(此代码最好在LINQPad主方法中使用):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
你会看到类似于这样的输出:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
当使用 ObserveOnDispatcher
进行调用时,只有在 Sleep
执行完毕后才能成功退出。
关键要点
需要记住的是,Reactive Extensions 实质上是一个自由线程库,并尽可能地惰性处理它运行的线程 - 您必须有意干预 ObserveOn
、SubscribeOn
并将特定调度程序传递给接受它们的运算符以更改这一点。
观察者的消费者无法控制其内部操作 - ObserveOn
和 SubscribeOn
是包装观察者和可观察对象表面区域以在线程之间进行调用的装饰器。希望这些示例已经说明了这一点。
Where
运行的位置有点谬误 - 它被评估,它有一个订阅处理程序,并且在其生命周期中将在多个时间点调用OnNext
、OnCompleted
和OnError
。因此,它可以在许多地方“运行”。恰好发生的是,它将在下游订阅调用来自哪个线程上运行,并在上游Observable调用其OnXXX
方法的任何线程上调用其观察者的OnXXX
方法。 - James WorldSubscribeOn
拦截对Subscribe
的调用,而ObserverOn
则拦截对IObserver<T>
的调用。这两种方法都会使相应的调用在指定的调度程序上执行。 - cwharrisSubscribeOn
不仅可以调度对 observables 的Subscribe
,还可以调度对 subscriptions 的Dispose
。 - Dave Sexton