在使用Reactive Extensions for .NET中的Observable.FromEventPattern时,我该如何避免任何阻塞?

10

我在使用TaskPoolScheduler订阅Observable.FromEventPattern()时遇到了一些并发问题。

下面是一个代码示例:

var dataStore = new DataStore();

Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .Select(x => 
    {
        Thread.Sleep(5000); // Simulate long-running calculation.
        var result = 42;
        return result;
    })
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
    });

dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.

我的问题是:当原始可观察对象 Observable.FromEventPattern() 发出任何新项(即当 DataStore 对象引发新的 DataChanged 事件时)时,它们似乎被阻塞等待前一项完成整个管道的流程。
由于订阅是在 TaskPoolScheduler 上完成的,我原本期望每个新发出的项都会简单地启动一个新任务,但实际上,如果管道忙碌时,事件的源头似乎会在事件调用上阻塞。
如何才能实现一个订阅,使得每个新发出的项(引发的事件)都在自己的任务/线程上执行,以便源对象永远不会在其内部的 DataChangedEvent.Invoke() 调用上阻塞?
(当然,Subscribe() lambda 应该在 UI 线程上执行-这已经是这种情况了。)
顺便说一句:@jonstodle 在 #rxnet Slack 频道中提到 TaskPoolScheduler 可能具有不同的语义,与我所假设的不同。具体来说,他说它可能创建一个任务,并在其中一个任务的事件循环中同时进行订阅和值的生成。但如果是这样的话,那么我就发现第一个事件调用并不会阻塞(因为第二个事件调用会阻塞)。我认为,如果任务池任务在订阅方面足够异步,使得源不必在第一个调用上阻塞,那么就没有必要在第二个调用上阻塞了,这对我来说有点奇怪。
1个回答

12
你遇到的问题其实是 Rx 的工作原理 - 在常规的 Rx 流水线中,每个产生的值都会被流水化处理,一次只处理一个值。如果 Rx 流水线的源(在你的情况下是 FromEventPattern<DataChangedEventArgs>)产生的值比观察者处理它们的速度更快,则它们会在流水线中排队等待处理。
规则是流水线中的每个观察者一次只处理一个值。这适用于任何计划程序,而不仅仅是 TaskPoolScheduler
使其按照你想要的方式工作的方法非常简单-创建并行管道,然后将值合并回单个管道。
这是更改内容:
Observable
    .FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(5000); // Simulate long-running calculation.
            var result = 42;
            return result;
        }))
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent();
    });

.SelectMany(x => Observable.Start(() =>取代了.Select(x =>,允许值成为一个新的可观察订阅,该订阅立即运行,然后将其值合并回单个可观察对象。

您可能更喜欢将其写成语义上相同的.Select(x => Observable.Start(() => ...)).Merge()

以下是一个简单的示例,展示了它的工作原理:

var source = new Subject<int>();

source
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(1000);
            return x * 2;
        }))
    .Subscribe(result =>
    {
        Console.WriteLine(result);
        source.OnNext(result);
        source.OnNext(result + 1);
    });

source.OnNext(1);

它产生:

2
4
6
14
12
8
10
24
28
30
26
16
20
22
18
48
50
56
52
58
60
62
54
32
34
46
44
40
42

哇,太棒了,非常感谢!你的解释和建议修复都非常有道理。但是还有一件事我还不太明白——如果管道只能一次处理一个项目,为什么第一个事件调用不会阻塞呢?或者反过来说,如果后续的项目像你说的那样排队等待处理,为什么不能将这些项目放入队列以供将来处理,同时立即返回控制权并允许调用代码继续执行? - Daniel Rosenberg
@DanielStolt - 这取决于可观察对象正在处理的线程。这段代码 Observable.Range(0, 10).Subscribe(x => Console.WriteLine(x)); Console.WriteLine("Done."); 在一个线程上执行,因此数字出现在单词 Done. 之前,但是这段代码 Observable.Range(0, 10).ObserveOn(Scheduler.Default).Subscribe(x => Console.WriteLine(x)); Console.WriteLine("Done."); 被推送到一个新线程,所以 Done. 出现在中间。 - Enigmativity
关于您的第二条评论,这绝对不是我观察到的(有意而言)。在DataStore中,对DataChanged.Invoke()首次调用肯定会立即返回,尽管Select()委托需要几秒钟才能完成。只有随后的调用会阻塞,直到第一项完全通过整个管道。我发现这种不对称奇怪。 - Daniel Rosenberg
@DanielStolt - 如果您删除.SubscribeOn(TaskPoolScheduler.Default),我很想知道该行为是否会发生变化? - Enigmativity
1
是的,正如预期的那样,如果我将其移除,则调用将完全同步运行,即“DataChanged.Invoke()`”调用会阻塞直到整个管道返回。 - Daniel Rosenberg
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接