ObserveOn 和 SubscribeOn - 工作的位置在哪里

68
基于阅读这个问题:SubscribeOn和ObserveOn有什么区别? ObserveOn设置了代码在Subscribe处理程序中执行的位置: stream.Subscribe(_ => { // 在这里执行 }); SubscribeOn方法设置了流设置所在的线程。
我了解到,如果没有明确设置,则使用TaskPool。
现在我的问题是,假设我像这样做:
Observable.Interval(new Timespan(0, 0, 1))
          .Where(t => predicate(t))
          .SelectMany(t => lots_of(t))
          .ObserveOnDispatcher()
          .Subscribe(t => some_action(t));

如果在调度程序上执行了some_action,那么Where predicateSelectMany lots_of 将在哪里执行?

2个回答

214

SubscribeOn 拦截对 IObservable<T>中的单个方法 Subscribe 的调用以及对Subscribe返回的IDisposable 句柄上的 Dispose 的调用。

ObserveOn 拦截对IObserver<T> 中的方法 OnNext, OnCompletedOnError 的调用。

这两种方法都会在指定的调度程序上进行相应的调用。

SubscribeOn 使得observable的Subscribe方法在指定的调度程序或上下文中异步执行,用于避免从当前线程调用observable的Subscribe方法导致阻塞。

请注意,SubscribeOn仅影响应用到其上的observable,而不是整个可观察链。

SubscribeOn在您调用Subscribe和订阅的可观察对象之间,拦截调用并使其异步化。它还会影响订阅的处理。 Subscribe返回一个用于取消订阅的IDisposable句柄。 SubscribeOn确保对所提供的调度程序调用Dispose

在试图理解SubscribeOn的作用时,常见的困惑点是可观察对象的Subscribe处理程序可能在同一线程上调用OnNextOnCompletedOnError。然而,它的目的不是影响这些调用。流在Subscribe方法返回之前完成是很常见的情况。例如,Observable.Return就是这样做的。让我们来看一下。

如果您使用我编写的Spy方法,并运行以下代码:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

你会得到如下输出(线程 ID 可能会有所不同):

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

您可以看到整个订阅处理程序在同一线程上运行,并在返回之前完成。

让我们使用SubscribeOn来异步运行它。我们将同时对Return可观察对象和SubscribeOn可观察对象进行监听:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

这会输出以下内容(我添加了行号):

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.

01-主方法在线程1上运行。

02-Return可观察对象在调用线程上进行评估。我们只是在这里获取IObservable,尚未订阅任何内容。

03-SubscribeOn可观察对象在调用线程上进行评估。

04-现在最后我们调用SubscribeOnSubscribe方法。

05-Subscribe方法异步完成...

06-...然后线程1返回到主方法。这就是SubscribeOn的作用!

07-与此同时,SubscribeOn在默认调度程序上安排了对Return的调用。 这里在线程2上接收到它。

08-并且像Return一样,它在Subscribe线程上调用OnNext...

09-而SubscribeOn现在只是一个通过。

10,11-同理OnCompleted

12-最后Return订阅处理程序完成。

希望这可以解决SubscribeOn的目的和效果!

ObserveOn

如果您认为 SubscribeOn 是拦截器的 Subscribe 方法,该拦截器将调用传递到不同的线程,则 ObserveOn 执行相同的工作,但对于 OnNext OnCompleted OnError 调用。

回想一下我们的原始示例:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

这导致了以下输出:

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

现在让我们改用ObserveOn

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

我们得到以下输出:

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2

01 - 主方法运行在线程1上。

02 - 与以前一样,Return可观察对象在调用线程上进行评估。 我们只是在这里获取IObservable,还没有订阅任何内容。

03 - ObserveOn可观察对象也在调用线程上进行评估。

04 - 现在我们订阅了ObserveOn可观察对象,同样在调用线程上进行,然后...

05 - ...然后将调用传递给Return可观察对象。

06 - 现在,Return在其Subscribe处理程序中调用OnNext

07 - 这是ObserveOn的效果。我们可以看到OnNext被异步地计划在线程2上。

08 - 同时,Return在线程1上调用了OnCompleted...

09 - 并且Return的订阅处理程序完成了...

10 - 随后,ObserveOn的订阅处理程序也完成了...

11 - 因此,控制权被返回到主方法

12 - 与此同时,ObserveOnReturnOnCompleted调用传递到线程2。这可能发生在09-11的任何时间,因为它是异步运行的。只是碰巧现在调用了。

典型的使用情况是什么?

在GUI中,您最常见到的是在订阅长时间运行的observable并希望尽快离开调度程序线程时使用SubscribeOn - 可能是因为您知道它是其中一个observable,它在订阅处理程序中完成所有工作。 将其应用于可观察链的末尾,因为这是订阅时调用的第一个可观察对象。

在GUI中,您最常见到的是在确保将OnNextOnCompletedOnError调用驳回到调度程序线程时使用ObserveOn。 将其应用于可观察链的末尾,尽可能晚地转换回来。

希望您能看出,答案是ObserveOnDispatcherWhereSelectMany执行的线程不会产生任何影响-这完全取决于stream从哪个线程调用它们! stream的订阅处理程序将在调用线程上调用,但是不知道WhereSelectMany将在哪里运行,而不知道stream如何实现。

超出Subscribe调用寿命的observable

到目前为止,我们一直专注于Observable.ReturnReturnSubscribe处理程序内完成其流。虽然这并不罕见,但流经常超过Subscribe处理程序的寿命。 例如,请查看Observable.Timer

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");

这将返回以下内容:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2

您可以清楚地看到订阅完成后,OnNextOnCompleted在另一个线程上被调用。

请注意,任何SubscribeOnObserveOn的组合都不会对Timer选择在哪个线程或调度程序上调用OnNextOnCompleted产生任何影响。

当然,您可以使用SubscribeOn来确定Subscribe线程:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

(我故意在这里切换到NewThreadScheduler,以防止出现Timer恰好获取与SubscribeOn相同的线程池线程时引起混淆)

给出:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3

在这里,您可以清楚地看到主线程在线程 (1) 调用其 Subscribe 后返回,但 Timer 订阅却使用了自己的线程 (2),而 OnNextOnCompleted 调用运行在线程 (3) 上。

现在,对于 ObserveOn,让我们将代码更改为(对于那些跟随代码进行的人,请使用 nuget 包 rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

这段代码有些不同。第一行确保我们有一个调度程序,而且我们还引入了ObserveOnDispatcher - 这与ObserveOn非常相似,只是它指定我们应该使用ObserveOnDispatcher所评估的线程的DispatcherScheduler

这段代码会输出以下内容:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1

注意,调度程序(以及主线程)是线程1。 Timer仍然在它选择的线程(2)上调用OnNextOnCompleted,但ObserveOnDispatcher将调用封送回调用线程(1)。

还要注意,如果我们阻塞调度程序线程(例如通过Thread.Sleep),您会发现ObserveOnDispatcher将被阻塞(此代码最好在LINQPad主方法中使用):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");

你会看到类似于这样的输出:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1

当使用 ObserveOnDispatcher 进行调用时,只有在 Sleep 执行完毕后才能成功退出。

关键要点

需要记住的是,Reactive Extensions 实质上是一个自由线程库,并尽可能地惰性处理它运行的线程 - 您必须有意干预 ObserveOnSubscribeOn 并将特定调度程序传递给接受它们的运算符以更改这一点。

观察者的消费者无法控制其内部操作 - ObserveOnSubscribeOn 是包装观察者和可观察对象表面区域以在线程之间进行调用的装饰器。希望这些示例已经说明了这一点。


@Ben,谈论Where运行的位置有点谬误 - 它被评估,它有一个订阅处理程序,并且在其生命周期中将在多个时间点调用OnNextOnCompletedOnError。因此,它可以在许多地方“运行”。恰好发生的是,它将在下游订阅调用来自哪个线程上运行,并在上游Observable调用 OnXXX 方法的任何线程上调用其观察者的OnXXX方法。 - James World
6
我会添加一些小提示,以帮助那些试图记住所有这些内容的人。简单概括James所说的部分:SubscribeOn 拦截对 Subscribe 的调用,而 ObserverOn 则拦截对 IObserver<T> 的调用。这两种方法都会使相应的调用在指定的调度程序上执行。 - cwharris
2
非常棒的回答。仅凭这个就值得 Rx MVP。A++++,会再次点赞。 - user1228
1
我认为需要注意的是,SubscribeOn 不仅可以调度对 observables 的 Subscribe,还可以调度对 subscriptions 的 Dispose - Dave Sexton
2
S.O.应该有微支付功能来支付这种类型的答案。这可能是一个盈利的ICO的好商业计划! - Sentinel
显示剩余7条评论

23

我认为James的回答非常清晰和全面。但即便如此,我仍发现自己需要解释其中的区别。

因此,我创建了一个非常简单/愚蠢的示例,让我可以通过图形演示调度程序正在调用哪些事物。我创建了一个名为MyScheduler的类,它会立即执行操作,但会更改控制台的颜色。

SubscribeOn调度程序的文本输出以红色输出,而ObserveOn调度程序的输出则以蓝色输出。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace SchedulerExample
{

    class Program
    {
        static void Main(string[] args)
        {
            var mydata = new[] {"A", "B", "C", "D", "E"};
            var observable = Observable.Create<string>(observer =>
                                            {
                                                Console.WriteLine("Observable.Create");
                                                return mydata.ToObservable().
                                                    Subscribe(observer);
                                            });

            observable.
                SubscribeOn(new MyScheduler(ConsoleColor.Red)).
                ObserveOn(new MyScheduler(ConsoleColor.Blue)).
                Subscribe(s => Console.WriteLine("OnNext {0}", s));

            Console.ReadKey();
        }
    }
}

这将输出:

scheduler

参考 MyScheduler(不适用于实际使用):

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace SchedulerExample
{
    class MyScheduler : IScheduler
    {
        private readonly ConsoleColor _colour;

        public MyScheduler(ConsoleColor colour)
        {
            _colour = colour;
        }

        public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            return Execute(state, action);
        }

        private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            var tmp = Console.ForegroundColor;
            Console.ForegroundColor = _colour;
            action(this, state);
            Console.ForegroundColor = tmp;
            return Disposable.Empty;
        }

        public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public DateTimeOffset Now
        {
            get { return DateTime.UtcNow; }
        }
    }
}

1
请确保您添加了NuGet包“Rx-Main”。 - Contango
喜欢这个颜色示例!return mydata.ToObservable().Subscribe(observer);会返回一个IDisposable,它的Dispose方法也会以红色打印 - 如果有人告诉它打印任何东西的话。 - nitzel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接