我查看了
源代码 中的
ToObservable
,并提炼出了一个最小化的实现。它能够复制我们所观察到的行为。
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec);
}
else
{
observer.OnCompleted();
}
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
说完这些,问题的关键在于CurrentThreadScheduler
的行为,它是默认使用的调度程序。
CurrentThreadScheduler
的行为是,如果在调用Schedule
时一个调度程序已经在运行中,那么它最终会被排队。
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
这会打印出2 1
。队列的行为成为了我们的失败之处。
当调用observer.OnCompleted()
时,它会导致Concat
开始下一次枚举。然而,与最初不同的是,在尝试安排下一个枚举时,我们仍然处于observer => { }
块内部。因此,下一个任务并不会立即执行,而是被排入队列。
现在enumerator.MoveNext()
被卡在死锁中。它无法移动到下一个项,因为MoveNext
被阻塞,直到下一个项“到达”,而这只能通过ToObservable
循环调度来实现。
但是,调度程序只能工作以通知ToEnumerable
和随后被阻止的MoveNext()
,其中MoveNext
在第一次就被阻塞了,因此只有在它退出loopRec
之后才能继续工作 - 而它不能这样做,因为它被MoveNext
阻塞了。
附加说明
这大约是ToEnumerable
(来源于GetEnumerator.cs)的实现(不是有效的实现):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait();
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
可枚举对象在产生下一个项目之前被期望为阻塞的 - 这就是为什么需要实现门控的原因。 Enumerable.Range
不会阻塞,但是 ToEnumerable
会。
.ToObservable(Scheduler.Default)
代替。这将与你的代码一起工作。我需要花更多时间来给你解释原因。 - Enigmativityenumerable.ToObservable()
的订阅都会重新开始枚举。就像对可枚举对象的foreach
调用会重新开始枚举一样。这里的问题是由Scheduler.Immediate
调度程序引起的死锁。 - EnigmativityScheduler.CurrentThread
才会出现死锁。所以我猜这是默认设置(当没有参数调用ToObservable
时)。 - Theodor Zoulias