反应式限流器返回在时间间隔内添加的所有项。

11
给定一个类型为IObservable<T>的对象,是否有办法利用Throttle行为(当添加了一个项目时重置计时器,但返回该时间段内添加的所有项目的集合?)? Buffer提供了类似的功能,它将数据分块成IList<T>,每个时间间隔或计数一次。但我需要每次添加项目时重置计时器。
我在这里看到了一个类似的问题Does reactive extensions support rolling buffers?,但答案似乎不是很理想,而且有点过时,所以我想知道Rx-Main的发布版本是否现在已支持此功能。

听起来我的 https://dev59.com/Gmsz5IYBdhLWcg3w47-m#7604825 中的 BufferWithInactivity 答案就是你所需要的。你能否澄清一下你的问题? - Enigmativity
@Enigmativity,没错,那正是我想要的功能。我在我的问题中引用了那个问题 :) 但我不喜欢那个答案,答复者明确表示它还在进展中。 - RichK
不确定你在问什么。如果计时器每次“添加”(传播?)一个项目就会被重置,那么一开始会有什么可以缓冲的呢? - Asti
3个回答

14

正如我在另一个帖子中回答的,是的,你可以!使用ObservableThrottleWindow方法:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

4
很好的回答!但是为了避免两次订阅冷的可观测对象,应该使用 return stream.Publish(hot =>... 发布流,对吧? - Ziriax

4
我通过添加一个 Publish 组件改进了 Colonel Panic 的 BufferUntilInactive 运算符,使其在处理冷可观测者时能够正确工作:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

为了完整性,我还添加了一个可选的IScheduler参数,用于配置定时器运行的调度程序。

如果您需要具有 maxCount 参数的 BufferUntilInactive 变体,可以在此处查看 here - Theodor Zoulias
注意:Window运算符(Rx版本5.0)目前存在错误,可能会导致丢失值。触发该错误的条件目前尚未完全研究。 - Theodor Zoulias

0

不,时间组件会立即启动并缓冲所有内容,直到经过该时间,而计数将缓冲n个项目。我需要在添加项目时重置缓冲时间(就像节流一样)。 - RichK

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接