为什么Observable.Generate()会抛出System.StackOverflowException?

10

我正在编写一个用于聚合时间事件以进行报告的C# (.NET 4.5)应用程序。为了使我的查询逻辑可重用于实时和历史数据,我利用了Reactive Extensions (2.0)及其IScheduler基础设施(HistoricalScheduler等)。

例如,假设我们创建了一个事件列表(按时间顺序排序,但它们可能重叠!),其唯一负载是它们的时间戳,并想要知道它们在固定持续时间的缓冲区中的分布:

const int num = 100000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();

var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

var stream = Observable.Generate<int, DateTimeOffset>(
    0,
    s => s < events.Count,
    s => s + 1,
    s => events[s],
    s => events[s],
    time);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

运行此代码会导致一个System.StackOverflowException,并显示以下堆栈跟踪(最后3行):

运行此代码会导致System.StackOverflowException,并显示以下堆栈跟踪(最后3行):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...

好的,问题似乎出在我使用了Observable.Generate(),这取决于列表大小(num),无论选择哪种调度程序。

我做错了什么?或者更一般地说,从提供自己时间戳的事件的IEnumerable创建IObservable的首选方法是什么?


1
在遇到这个错误之前,num 可以有多大?此外,如果您在调试器中逐步执行此操作,那么在看到错误之前执行的最后一行代码是什么? - Jim Mischel
对我来说,关键的阈值似乎在 ~ num = 51600(在 Release 配置中稍微少一些,在 Debug 配置中稍微多一些)。可观察序列似乎已经完全创建。我可以在 Observable.Generate() 的 lambda 表达式处设置断点。异常在最后一次调用 Console.WriteLine() 后抛出。 - bastian schmick
1
理解一下,这只是一个猜测,但它看起来非常像流正在试图处理每个元素,而每个元素都在试图处理流。你最终会得到实质上是对CancelDispose的递归调用,这将使您的堆栈溢出(默认大小为1兆字节)。我不太熟悉Observable,无法说明为什么会发生这种情况。 - Jim Mischel
@Jim:非常感谢您的建议,我认为您指出了我需要的方向。请看下面的方法,它似乎可以解决我的问题,至少对于我目前的需求来说是这样。 - bastian schmick
2个回答

4

问题出在Observable.Generate的工作方式上——它用于展开一个基于参数的共递归(将递归反转)生成器;如果这些参数最终生成了一个非常嵌套的共递归生成器,你会遇到堆栈溢出的问题。

从这一点开始,我有很多猜测(没有Rx源码在我面前)(见下文),但我敢打赌你的定义最终会扩展为类似以下的内容:

(更新 - 意识到我没有提供替代方案:请参见答案底部)

initial_state =>
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) =>
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ...

一直这样下去,直到你的调用堆栈足够大而溢出。比如,在方法签名和计数器中加上int型变量,每次递归调用大约需要8-16字节(取决于状态机生成器的实现方式),所以60000左右的深度是正确的(1M / 16 = 62500)。

编辑:查看源代码确认:Generate方法的“Run”方法看起来像这样——请注意嵌套调用Generate

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink)
{
    if (this._timeSelectorA != null)
    {
        Generate<TState, TResult>.α α = 
                new Generate<TState, TResult>.α(
                     (Generate<TState, TResult>) this, 
                     observer, 
                     cancel);
        setSink(α);
        return α.Run();
    }
    if (this._timeSelectorR != null)
    {
        Generate<TState, TResult>.δ δ = 
               new Generate<TState, TResult>.δ(
                   (Generate<TState, TResult>) this, 
                   observer, 
                   cancel);
        setSink(δ);
        return δ.Run();
    }
    Generate<TState, TResult>._ _ = 
             new Generate<TState, TResult>._(
                  (Generate<TState, TResult>) this, 
                  observer, 
                  cancel);
    setSink(_);
    return _.Run();
}

编辑:抱歉,没有提供任何替代方案......这里有一个可能会起作用的:

(编辑:修复了 Enumerable.Range ,因此流大小不会乘以 chunkSize )

const int num = 160000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

    // Size too big? Fine, we'll chunk it up!
const int chunkSize = 10000;
var numberOfChunks = events.Count / chunkSize;

    // Generate a whole mess of streams based on start/end indices
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)
    let startIdx = chunkIndex * chunkSize
    let endIdx = Math.Min(events.Count, startIdx + chunkSize)
    select Observable.Generate<int, DateTimeOffset>(
        startIdx,
        s => s < endIdx,
        s => s + 1,
        s => events[s],
        s => events[s],
        time);

    // E pluribus streamum
var stream = Observable.Concat(streams);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

谢谢,太完美了!似乎比我自己的解决方法更有效率。不过我在你的算术中发现了一个小错误(请参见编辑)。我仍然不太明白为什么需要在RX内部进行递归实现。毕竟,在RX v1.0中它似乎可以工作(超过60,000个大小)。还是非常好的研究,聪明的解决方案。再次感谢! - bastian schmick
没问题!嘿 - 我其实很惊讶我只犯了一个数学错误... ;) - JerKimball

3

好的,我采用了一种不需要使用lambda表达式作为状态转换的不同工厂方法,现在我不再看到任何堆栈溢出。我还不确定这是否符合我的问题的正确答案,但它可以工作,所以我想在这里分享:

var stream = Observable.Create<DateTimeOffset>(o =>
    {
        foreach (var e in events)
        {
            time.Schedule(e, () => o.OnNext(e));
        }

        time.Schedule(events[events.Count - 1], () => o.OnCompleted());

        return Disposable.Empty;
    });

在返回订阅之前手动调度事件似乎有些别扭,但在这种情况下它可以在 lambda 表达式内完成。如果这种方法有什么问题,请纠正我。此外,我仍然很高兴听到我原来的代码违反了 System.Reactive 的哪些隐含假设。(天啊,我应该早点检查:使用 RX v1.0,原始的 Observable.Generate() 确实可以工作!)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接