响应式编程扩展是否支持滚动缓冲区?

27

我正在使用反应式扩展将数据聚合到100毫秒的缓冲区中:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

这个代码可以正常工作。然而,我想要的行为略有不同于Buffer操作提供的行为。实际上,我希望在接收到其他数据项时重置计时器。只有当整个100ms内没有收到任何数据时才处理它。这打开了永远不处理数据的可能性,因此我还应该能够指定最大计数。我想象中的代码应该是这样的:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

我已经查过了,但是在Rx中没有找到类似的东西?有人可以证实/否认这一点吗?


我确定我在 Rx 的教程视频中看到过这种行为,但我不记得具体是什么或在哪里看到的。 :( - Chris
啊,节流(http://msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx)是我想到的,但我不认为它单独能做到你想要的。不确定是否有某种方法可以将其组合起来以实现所需的功能... - Chris
6个回答

19

通过组合内置的WindowThrottle方法可以实现。这两个方法都在Observable类中定义。首先,让我们解决简单问题,即忽略最大计数条件:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

强大的Window方法完成了繁重的工作。现在很容易看到如何添加最大计数:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

我将在我的博客上写一篇文章来解释这个问题。 https://gist.github.com/2244036

Window方法的文档:


3
通过上述的BufferUntilInactive场景 - 如果订阅者比生产者慢,您可能会看到下一组窗口化项目将被缓冲,并且除非生成一个项目,否则不会推送给订阅者... - Rohit Sharma
我已经附上一个示例http://snipt.org/Bhao0。在Visual Studio中,(1)打开输出窗口,(2)检查挂起按钮,(3)单击该按钮,(4)等待它在控制台上打印“现在点击”,(5)按下按钮三次,你会发现这三个点击被忽略了。 - Rohit Sharma
目前这个解决方案只能正确地处理热序列。为了让它也能处理冷序列,需要添加一个 Publish 组件,如此处所示。 - Theodor Zoulias

16

我编写了一个扩展程序,可以完成大部分你想要的功能 - BufferWithInactivity

这是它的代码:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}

+1 谢谢。这段代码是你为了这个问题写的还是为了自己?它已经在生产代码中使用过了吗? - Kent Boogaart
@KentBoogaart - 我几个月前写了它,但它还没有进入生产代码。它仍然是一个正在进行的项目。 - Enigmativity

2

使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区过载来满足这两个要求:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

请参考文档https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

但这不会有一个滑动窗口,并且也不会有所请求的“去抖动”行为吗? - Cocowalla
@Cocowalla 我重新阅读了原始问题和我提供的代码,它确实满足所有要求。我已经在生产代码中使用过,并取得了巨大的成功。 - Sébastien Lorion
抱歉,我特别指的是去抖动行为:“如果接收到另一个数据项,则要重置计时器”- 我没有看到你的代码这样做?据我所知,只要缓冲区不为空,你的代码将始终每100毫秒向订阅者推送缓冲区。 - Cocowalla
我现在明白你所说的去抖动(debounce)是什么意思了,不过我对这个术语的理解更像是 http://reactivex.io/documentation/operators/debounce.html 中的 Observable.Throttle。你所要求的更加复杂,但我猜可以用 Observable.Window 来实现。无论如何,除非我漏掉了什么,我的答案与此问题上被接受的答案完全相同。 - Sébastien Lorion
1
不,这个答案的行为与被接受的答案不同。根据要求,被接受的答案正确地推迟了在源可观察对象持续活动的情况下发出缓冲区。而这个答案只是每100毫秒发出一次缓冲区。 - Theodor Zoulias

2

正如 Rohit Sharma 在 Colonel Panic 的 solution 中所提到的,存在一个问题,即项目将被缓冲而不会被推送给订阅者,除非生成一个项目。

正如在这个 comment 中所描述的那样,问题在于 p.Window(() => closes),因为它打开了一个间隙,在这个间隙中可能会错过事件。

每次处理窗口后,该 lambda 将被调用。 Window 运算符将在每次调用 Subscribe 时调用 lambda 返回的内容,因为就其而言,你每次都可能从 lambda 返回完全不同的 IObservable。

由于现在始终使用相同的 lambda,我们需要调整 maxCount。如果不进行更改,则 maxCount 永远不会被重置,一旦它达到最大值,每个新事件都将超过 maxCount。

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            Int32 i = 0;

            var overflows = p.Where(x =>
            {
                ++i;

                if (i >= maxCount)
                {
                    i = 0;
                    return true;
                }

                return false;
            });

            closes = closes.Merge(overflows);
        }

        return p.Window(closes).SelectMany(window => window.ToList());
    });

    return publish;
}

更新:
经过进一步的测试,我发现仍然有些情况下,项目将不能正确地推送给订阅者。

这里是我们已经使用4个月而没有任何问题的解决方法。

解决方法是添加 .Delay(...) 与任何 TimeSpan

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            var overflows = stream.Where((x, index) => index + 1 >= maxCount);
            closes = closes.Merge(overflows);
        }

        return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
    });

    return publish;
}

0

我想这可以通过如下所示的Buffer方法来实现:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

注意:我没有测试过,但希望它能给你一个想法。

0

Colonel Panic的solution几乎完美。唯一缺少的是一个Publish组件,以便使解决方案也适用于冷序列。

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

除了添加“Publish”之外,我还将原始的“.Where((_, index) => index + 1 >= maxCount)”替换为等效但更短的“.Skip(maxCount - 1)”。为了完整起见,还有一个“IScheduler”参数,用于配置定时器运行的调度程序。

如果您需要一个更简单的 BufferUntilInactive 变体,它不包括 maxCount 参数,您可以在这里查看。 - Theodor Zoulias
注意:目前在Window操作符(Rx版本5.0)中存在一个错误,可能导致值丢失。目前尚未完全研究出触发该错误的条件。 - Theodor Zoulias
注意:目前 Window 运算符(Rx 5.0 版本)存在一个漏洞,可能会导致某些值丢失。目前尚未完全研究触发此漏洞的条件。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接