正如 Rohit Sharma 在 Colonel Panic 的 solution 中所提到的,存在一个问题,即项目将被缓冲而不会被推送给订阅者,除非生成一个项目。
正如在这个 comment 中所描述的那样,问题在于 p.Window(() => closes)
,因为它打开了一个间隙,在这个间隙中可能会错过事件。
每次处理窗口后,该 lambda 将被调用。 Window 运算符将在每次调用 Subscribe 时调用 lambda 返回的内容,因为就其而言,你每次都可能从 lambda 返回完全不同的 IObservable。
由于现在始终使用相同的 lambda,我们需要调整 maxCount。如果不进行更改,则 maxCount 永远不会被重置,一旦它达到最大值,每个新事件都将超过 maxCount。
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
Int32 i = 0;
var overflows = p.Where(x =>
{
++i;
if (i >= maxCount)
{
i = 0;
return true;
}
return false;
});
closes = closes.Merge(overflows);
}
return p.Window(closes).SelectMany(window => window.ToList());
});
return publish;
}
更新:
经过进一步的测试,我发现仍然有些情况下,项目将不能正确地推送给订阅者。
这里是我们已经使用4个月而没有任何问题的解决方法。
解决方法是添加 .Delay(...)
与任何 TimeSpan
。
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
var overflows = stream.Where((x, index) => index + 1 >= maxCount);
closes = closes.Merge(overflows);
}
return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
});
return publish;
}