我在使用TaskPoolScheduler
订阅Observable.FromEventPattern()
时遇到了一些并发问题。
下面是一个代码示例:
var dataStore = new DataStore();
Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.Select(x =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
})
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
});
dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.
我的问题是:当原始可观察对象 Observable.FromEventPattern() 发出任何新项(即当 DataStore 对象引发新的 DataChanged 事件时)时,它们似乎被阻塞等待前一项完成整个管道的流程。
由于订阅是在 TaskPoolScheduler 上完成的,我原本期望每个新发出的项都会简单地启动一个新任务,但实际上,如果管道忙碌时,事件的源头似乎会在事件调用上阻塞。
如何才能实现一个订阅,使得每个新发出的项(引发的事件)都在自己的任务/线程上执行,以便源对象永远不会在其内部的 DataChangedEvent.Invoke() 调用上阻塞?
(当然,Subscribe() lambda 应该在 UI 线程上执行-这已经是这种情况了。)
顺便说一句:@jonstodle 在 #rxnet Slack 频道中提到 TaskPoolScheduler 可能具有不同的语义,与我所假设的不同。具体来说,他说它可能创建一个任务,并在其中一个任务的事件循环中同时进行订阅和值的生成。但如果是这样的话,那么我就发现第一个事件调用并不会阻塞(因为第二个事件调用会阻塞)。我认为,如果任务池任务在订阅方面足够异步,使得源不必在第一个调用上阻塞,那么就没有必要在第二个调用上阻塞了,这对我来说有点奇怪。
Observable.Range(0, 10).Subscribe(x => Console.WriteLine(x)); Console.WriteLine("Done.");
在一个线程上执行,因此数字出现在单词Done.
之前,但是这段代码Observable.Range(0, 10).ObserveOn(Scheduler.Default).Subscribe(x => Console.WriteLine(x)); Console.WriteLine("Done.");
被推送到一个新线程,所以Done.
出现在中间。 - EnigmativityDataStore
中,对DataChanged.Invoke()
的首次调用肯定会立即返回,尽管Select()
委托需要几秒钟才能完成。只有随后的调用会阻塞,直到第一项完全通过整个管道。我发现这种不对称奇怪。 - Daniel Rosenberg.SubscribeOn(TaskPoolScheduler.Default)
,我很想知道该行为是否会发生变化? - Enigmativity