为什么重复的 Enumerable 转 Observable 会阻塞

6

这是一个相当有教育意义的问题,出于好奇,请考虑以下代码片段:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

SubscribeDebug 订阅简单的观察者:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

这段代码的输出如下:

值: 0

值: 1

值: 2

值: 3

值: 4

然后是一些块。有人可以帮我理解它发生的基本原因以及为什么 observable 没有完成吗?我已经注意到没有 Concat 调用时,它会完成,但使用它却阻塞了。

当您连接一个已经完成的不同的可观察对象时,这种行为是否也存在? - Progman
1
你的代码由于使用的调度程序而创建了死锁。尝试使用.ToObservable(Scheduler.Default)代替。这将与你的代码一起工作。我需要花更多时间来给你解释原因。 - Enigmativity
1
@Progman - 你走错了方向。每个对 enumerable.ToObservable() 的订阅都会重新开始枚举。就像对可枚举对象的 foreach 调用会重新开始枚举一样。这里的问题是由 Scheduler.Immediate 调度程序引起的死锁。 - Enigmativity
3
问题似乎不在于 Scheduler.Immediate,因为当我将其传递给 ToObservable() 时,两个枚举都会被迭代。但是,如果在没有任何调度程序实现的情况下调用它,代码会被阻塞。 - Oguz Ozgul
2
@OguzOzgul,只有使用所有静态调度程序中的Scheduler.CurrentThread才会出现死锁。所以我猜这是默认设置(当没有参数调用ToObservable时)。 - Theodor Zoulias
1个回答

9
我查看了 源代码 中的 ToObservable,并提炼出了一个最小化的实现。它能够复制我们所观察到的行为。
    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

说完这些,问题的关键在于CurrentThreadScheduler的行为,它是默认使用的调度程序。

CurrentThreadScheduler的行为是,如果在调用Schedule时一个调度程序已经在运行中,那么它最终会被排队。

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

这会打印出2 1。队列的行为成为了我们的失败之处。

当调用observer.OnCompleted()时,它会导致Concat开始下一次枚举。然而,与最初不同的是,在尝试安排下一个枚举时,我们仍然处于observer => { }块内部。因此,下一个任务并不会立即执行,而是被排入队列。

现在enumerator.MoveNext()被卡在死锁中。它无法移动到下一个项,因为MoveNext被阻塞,直到下一个项“到达”,而这只能通过ToObservable循环调度来实现。

但是,调度程序只能工作以通知ToEnumerable和随后被阻止的MoveNext(),其中MoveNext在第一次就被阻塞了,因此只有在它退出loopRec之后才能继续工作 - 而它不能这样做,因为它被MoveNext阻塞了。

附加说明

这大约是ToEnumerable(来源于GetEnumerator.cs)的实现(不是有效的实现):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

可枚举对象在产生下一个项目之前被期望为阻塞的 - 这就是为什么需要实现门控的原因。 Enumerable.Range 不会阻塞,但是 ToEnumerable 会。


但是我实现了一个自定义的IEnumerable<int>并返回了一个自定义的IEnumerator<int>,我看到的是,当第一个枚举器的迭代完成时,GetEnumerator()再次被调用(由我返回一个新的),但MoveNext()从未被调用。 - Oguz Ozgul
1
我应该澄清一下 - 重要的不是你自己的 IEnumerable,而是由 Observable.ToEnumerable() 返回的 IEnumerable。那是会阻塞的地方。 - Asti
1
@TheodorZoulias 谢谢!一旦我实现了 ToObservable,问题就变得更加清晰了 - Rx堆栈跟踪相当难以阅读。编写简化的Rx运算符以理解它们的行为是一个很好的学习经验。 :) - Asti
1
太棒了!感谢您提供如此详细的答案! - Martin Zikmund
2
太棒了的回答! - Enigmativity
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接