这里的SubscribeOn()是做什么用的?

3

我正在通过解决随机问题并毫不羞耻地问一些愚蠢的新手问题来自学响应式编程。在弄清线程调度的工作原理时,我成功地使自己困惑了。虽然我相当确定这段代码没有逻辑意义,但我也不能理解其中发生了什么。想要理解可能会对我有所帮助。以下是代码:

var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();

var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());

var subscription = emitter.SubscribeOn(newThreadScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);

Console.WriteLine("DONE.");
Console.ReadLine();

我原本期望的是可能:

one    
DONE.
Complete!

由于我不确定SubscribeOn()的作用,因此可能会有交错。但是我得到的结果是:

DONE.
Complete!

这里到底发生了什么?为什么项目在完成之前没有被生产出来?在这种情况下,ObserveOn()的工作方式符合我的预期,我也明白原因:它在其他线程上运行委托,并且它们可以与“DONE”交错。那么SubscribeOn()到底是做什么的呢?


我对响应式编程一无所知,但从你的代码来看,你订阅了newThreadScheduler,但是所有的工作和计划都是针对testScheduler触发的。 - TyCobb
1个回答

2
这里出现的问题只是一个竞态条件。
如果我们将所有代码撤回到只有
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();

var subscription = emitter
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );



Console.WriteLine("DONE.");
Console.ReadLine();

我们得到了相同的结果。 通过使用Subject<T>,您将不会获得任何缓存行为,除了OnCompleted通知。 SubscribeOn操作符将在提供的IScheduler实例上安排任何订阅工作。 在订阅Subject<T>的情况下,几乎没有工作需要完成。 这几乎就像注册回调到一组回调列表一样简单。
将工作安排到NewThreadScheduler上将创建一个新线程,然后创建一个内部事件循环来处理安排的工作。 这非常快,但需要创建一个新线程、一个EventloopScheduler,并执行到新线程的上下文切换。
在您的示例中,您将OnNextOnCompleted通知安排在一个TestScheduler上。 然后您使用NewThreadScheduler进行了SubscribeOn。 接下来,您开始处理TestScheduler实例的所有安排工作。 对于这些虚拟安排的项进行处理只需迭代这些安排的项、执行委托并推进一个虚拟时钟即可。这是非常快的。
更具体地说,下面的代码类似于您编写的内容。
var newThreadScheduler =  new NewThreadScheduler();

var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));

foreach (var callback in callbacks)
{
    callback("one");
}

Console.WriteLine("Done");

这里我们只是有一个回调动作列表(称其为订阅者或观察者)。 然后在新线程上异步安排添加其中一个回调。 然后立即迭代回调并向每个回调发送字符串“one”。 结果是

Done
NewThreadScheduler没有足够的时间来启动一个新线程、安排操作并执行该操作,因此主线程可以在遍历集合之前执行。因此,有一些指南你可能没有遵循: 1)避免使用主题 ;-) 2)不要混淆线程和单元测试。我假设存在TestScheduler是因为你正在测试这个。但是你可以使用两个实例的TestScheduler,例如后台和前台实例。 为了更有帮助,我建议您从测试中删除第二个调度程序。在SubscribeOn运算符中使用TestScheduler实例。 接下来,我建议用CreateColdObservable方法替换使用subjects+调度的方法。 最后,我不知道将时间推进到1秒是否会获得任何东西,而不仅仅是使用Start方法。我认为这将减少噪音和魔法值1s的使用。
var testScheduler = new TestScheduler();

var source = testScheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
    ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));

var subscription = source.SubscribeOn(testScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.Start();

Console.WriteLine("DONE.");
Console.ReadLine();

现在唯一的问题是SubscribeOn调用相当冗余。
FYI: NewThreadScheduler的代码 - https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs

感谢您提供了如此详细的答案,这不是真正的生产代码,只是为了弄清楚事情的工作原理,所以有点奇怪!听起来将TestScheduler与“真实”计划程序混合使用会导致更多问题,所以我需要调整我的实验方式。 - OwenP
同意。可以使用测试调度程序/历史调度程序“虚拟化”时间,也可以使用真实的调度程序。 - Lee Campbell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接