使用TimeSpan选择器的Observable.Generate似乎存在内存泄露问题[当使用大于15毫秒的TimeSpan时]

24

我正在调查使用Observable.Generate创建按间隔采样的结果序列的用法,以msdn网站上的示例为起点。

以下代码没有TimeSpan选择器,不会出现内存泄漏:

IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));

然而,带有 TimeSpan 选择器的以下代码会导致内存泄漏:

TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString(),
                                              timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
例如,这个玩具应用程序将快速显示使用 VS 2015 社区版本附带的内存分析器显示的内存泄漏情况:
using System;
using System.Reactive.Linq;

namespace Sample
{
    public class Program
    {
        static void Main()
        {
            IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
            obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
            Console.ReadLine();
        }
    }
}

内存泄漏是不断增长的一组:

System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable

我是否使用这个API有误? 我应该期望内存增长吗?还是这是Reactive的一个bug?


可能正在进行多个订阅。Observable 是否在事件处理程序中构建? - supertopi
6
点赞数量令人困惑,因为任何人都可以复制粘贴并运行此代码,看到没有内存泄漏... :) - supertopi
1
@supertopi 这取决于 timeSpan 声明的位置。 - Jeroen van Langen
我已尝试使用更新的样本,没有复现。 - Asti
请注意:我重新打开了此问题,因为它并不是与 https://dev59.com/uFwY5IYBdhLWcg3waXO5?noredirect=1&lq=1 重复——TimeSpan 中微小的差异意味着示例和答案是不同的。 - James World
显示剩余2条评论
1个回答

6
这对我来说看起来像是一个 bug,或者至少是 DefaultScheduler 的“递归”调度实现中混乱/不良行为(它不是真正的递归,我说的是传递调度程序本身到已安排的操作的重载,以便您可以安排继续执行)。您看到的可处理对象是通过调用 DefaultScheduler.Schedule 方法创建的(此处的第 71 行:https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs)。
有几个原因导致其他尝试在此处发现此问题失败。首先,这些可处理对象最终会被处理 - 但仅当 Generate OnCompletes 或 OnErrors 时,此时当您订阅它时 Generate 返回的 System.Reactive.AnonymousSafeObserver 执行其清理工作。
其次,如果您使用短的 TimeSpan(请记住 .NET Timer 的最小分辨率是 15ms),那么 Rx 将优化掉定时器的使用,并调用不使用定时器的 QueueUserWorkItem,因此这些可处置对象不会被创建。
如果您深入了解 Generate 的实现(https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs),您可以看到它通过将初始调用 Schedule 返回的 IDisposable 传递回观察者来挂起它,直到出现错误/完成为止。这可以防止整个递归调用链被收集 - 这意味着如果您确实需要取消,或者当进行清理时,每个已安排操作的可处置对象才会被处理。
您可以在下面的代码中看到相同的效果,它直接使用 DefaultScheduler - 最后一行对 cancel 的引用足以导致泄漏。请确保使用发布版本,否则编译器将保留对 cancel 的引用直到方法结束。
// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;

TimeSpan timeSpan = TimeSpan.FromSeconds(1);

Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
    Console.WriteLine(state);

    if (state == maxCount)
    {
        mre.Set();
        return Disposable.Empty;
    }

    return self.Schedule(state + 1, timeSpan, recurse);
};

cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);

mre.WaitOne();

// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there's no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");

我使用Jetbrains dotMemory API获取内存转储以得出结论 - 我已经删除了上面代码中的这些API调用,但如果您有该产品,可以在此处找到完整的gist,并且您将能够清楚地看到取消注释最终一行的影响:https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c 或者,您可以使用MS分析器API - 目前我没有将其分页到我的大脑工作集中!

2
“我脑中的工作集中没有分页”这句话从现在开始我会经常使用,随着岁月的流逝,可能会更加频繁! - Sentinel
其实你能帮我解决这个问题吗?链接是https://stackoverflow.com/q/49880719/442396。虽然它应该很简单,但我还是卡住了。 - Sentinel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接