反应式扩展:使用不同间隔的节流/采样

8
我有一个IObservable,它会在随机时间间隔内产生值,我想对这个序列进行节流。我发现Throttle操作符的“节流”定义与我的不同。 Throttle只有在指定的时间间隔没有声音时才会产生值(它会产生最后一个看到的值)。我认为节流应该是按照指定的时间间隔产生值(当然,除非有沉默)。
例如,我期望Observable.Interval(100).Select((_,i) => i).Throttle(200)会产生偶数(除了任何性能/时间问题),因为我将其节流到“半速”。但是这个序列根本不产生任何值,因为从来没有长度为200的沉默期。
所以,我发现Sample实际上执行了我想要的“节流”行为。Observable.Interval(100).Select((_,i) => i).Sample(200)会产生(同样,除了任何性能/时间问题)偶数序列。
然而,我还有一个问题:间隔时间取决于最后“采样”的值。我想编写一个操作符,像这样:
public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);
intervalSelector参数确定下一个样本的时间间隔,第一个样本要么从第一个值开始,要么从另一个参数开始,我不在意。
我尝试写了一下,但最终得到了一个复杂的结构,并且不能正常工作。我的问题是,我能否使用现有的运算符(即用一行代码)构建它?
3个回答

6
许多小时后,我经过一些休息,终于明白了。
public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
    return source.TimeInterval()
                 .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
                 {
                     if(v.Interval >= acc.Item1)
                     {
                         return Tuple.Create(intervalSelector(v.Value), true, v.Value);
                     }
                     return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
                 })
                 .Where(t => t.Item2)
                 .Select(x => x.Item3);
}

这个方法可以按照我的要求工作:每次生成一个值 x,就停止生成值,直到经过 intervalSelector(x) 的时间。

0

我用另一种方式实现了它,我想与任何有用的人分享。

var random = new Random();
Observable.Return(Unit.Default)
            .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(random.Next(1, 6))))
            .TimeInterval()
            .Do(value => Console.WriteLine(value))
            .Repeat()
            .Subscribe();

0

BufferWithTime和其他缓冲区一样存在同样的缺陷:时间间隔是恒定的。我需要计算从上一个采样值开始等待多长时间才能取下一个样本。我会尝试为此绘制一个弹珠图... - R. Martinho Fernandes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接