长时间运行的进程中是否建议使用rx distinct?

3
我正在使用rx distinct操作符来基于长时间运行的进程中某个特定的键过滤外部数据流。假设将会收到很多不同的键,这样做会导致内存泄漏吗?rx distinct操作符如何跟踪之前接收到的键?我应该使用带有持续时间选择器的groupbyuntil吗?

2
不管3D-Grabber的回答如何(它是一个很好的回答),不要忽视明显的问题,不要忘记使用内存分析器来测试你的代码。如果你担心内存泄漏,无论如何都应该这样做。如果你已经考虑过这一点,请原谅。但我经常发现人们在API中纠结于可能的泄漏问题,而只需几分钟使用分析器就可以证明API是正常的,并找到他们自己代码中的严重泄漏问题。:)至少你可以测量你特定场景的特性。 - James World
谢谢James,我采纳了你的建议,确实我们在长时间订阅中使用distinct操作符时存在内存泄漏问题。我们已经改用带有关闭持续时间选择器的group by操作符,解决了内存泄漏问题。 - Misterhex
2个回答

5

Observable.Distinct使用HashSet内部。内存使用量大致与遇到的唯一键数成比例。(据我所知,约为30*n字节)

GroupByUntilDistinct非常不同。
GroupByUntil(很好地)进行分组,而Distinct过滤流中的元素。

不确定预期的用途,但如果您只想过滤连续的相同元素,则需要使用Observable.DistinctUntilChanged,其内存占用与键数无关。


使用 HashSet,而不是 HashMap。在 HashSet 中,所有的键都是唯一的。"对于非常大的 HashSet<T> 对象,在运行时环境中将 gcAllowVeryLargeObjects 配置元素的 enabled 属性设置为 true,可以将最大容量增加到 20 亿个元素,适用于 64 位系统"。 - Richard Anthony Freeman-Hein
HashSet,不是HashMap。已修复,谢谢,我被困在Java宇宙中了。 - 3dGrabber

1
这可能是一种有争议的策略,但如果您担心不同的键会累积,并且如果有一个安全重置的时间点,您可以使用Observable.Switch引入重置策略。例如,我们有一个场景,在这个场景中,“世界的状态”每天都会被重置,因此我们可以每天重置不同的可观察对象。
Observable.Create<MyPoco>(
    observer =>
    {
        var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));

        var timerSubscription =
            Observable.Timer(
                new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
                TimeSpan.FromDays(1),
                schedulerService.Default).Subscribe(
                    t =>
                    {
                        Log.Info("Daily reset - resetting distinct subscription.");
                        distinctPocos.OnNext(pocos.Distinct(x => x.Id));
                    });

        var pocoSubscription = distinctPocos.Switch().Subscribe(observer);

        return new CompositeDisposable(timerSubscription, pocoSubscription);
    });

然而,我倾向于同意James World在上面的评论中提到的使用内存分析器进行测试,以检查内存是否确实是一个问题,然后再引入潜在的不必要的复杂性。如果您正在累积32位整数作为键,则在大多数平台上,在遇到内存问题之前,您将拥有数百万个唯一项目。例如,262144个32位整数键将占用1兆字节。根据您的情况,可能会在此之前长时间重置进程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接