从Observable中获取每N个元素

4

我需要观察一个以固定时间间隔产生值的流,但是产生的值太多了,因此我希望每十个值中取一个。

非响应式编程的等价方式如下:

int step = 10;  // take every 10th value
var numbers = Enumerable.Range(0, 100).Where((e, i) => i % step == 0);

在Rx中,最常用的方法是什么?

3
你可以尝试以下代码:Observable.Range(0, 100).Where((value, index) => index % step == 0)。意思是筛选出从0到99的数字序列中,满足索引值除以给定步长无余数的元素。 - Evk
1
考虑在 Rx 中使用 Throttle。http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html - MistyK
1
@MistyK 不幸的是,节流并不能解决问题,因为我的元素定期到达,并且根据文档所述,“节流方法仅对以可变速率产生值的序列有用。以恒定速率产生值的序列(例如Interval或Timer)如果产生的值比 节流周期 更快,所有的值都将被抑制; 如果它们比 节流周期 慢,则所有的值都将被传播。”这是因为Throttle是基于时间而不是计数/索引的。 - heltonbiker
2个回答

3
我会使用WhereBuffer
int step = 10;  // take every 10th value

// Where
var numbers = Observable.Range(0, 100).Where((e, i) => i % step == 0);

// Buffer
numbers = Observable.Range(0, 100).Buffer(step).Select(x => x[step - 1]);

FYI:这是一个可行的,但是快速而粗糙的解决方案。如果“every”是3或5,那么没有问题,但如果是100,并且物品很重,可能会导致性能损失。如果我正确阅读了https://github.com/dotnet/reactive/blob/9a46e892bc761165664cb171f29724f5e55be827/AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Buffer.cs,Buffer()为每个周期创建一个新缓冲区,因此我们每次都有一个新数组或列表,加上所有缓存的元素,只是为了选择最后一个并留下大量垃圾供GC清理。TheodorZoulias的答案要好得多! - quetzalcoatl

3

我建议避免使用具有 Func<TSource, int, bool> predicate 参数的 Where 运算符,以免因源序列产生超过 2,147,483,647 个元素而导致 OverflowException 异常。无论是 Linq 还是 Rx 运算符都允许算术溢出发生,我不知道任何可防止此情况发生的解决方法。

这里提供了一个名为 TakeEvery 的自定义运算符,在这方面是安全的:

/// <summary>Samples the observable sequence by returning one every N elements,
/// ignoring the other elements.</summary>
public static IObservable<T> TakeEvery<T>(this IObservable<T> source, int step)
{
    if (step < 1) throw new ArgumentOutOfRangeException(nameof(step));
    return Observable.Defer(() =>
    {
        int index = -1;
        return source.Where(value => (index = ++index % step) == 0);
    });
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接