基于值限制 IObservable 的流量

9

我有一个 IObservable<String>

我正在尝试检测(并处理)短时间内相同的字符串被通知的情况。

我希望有一个过滤器/流/可观察对象,如果同样的字符串在250毫秒内多次通知,则仅通知一次。

不确定从哪里开始。

2个回答

17

这里有一个非常紧凑的解决方案。您的帖子对于持续时间是否会在出现不同值后立即重置有些模糊 - 因此,我提供了两个解决方案来解释这两个情况。

变体1- 不同的“中间”值不会重置计时器

这是当您严格关注抑制持续时间并且不关心是否存在任何“中间”值(与McGarnagle的解决方案相同)时的情况 - 例如,如果您快速获取 "a","b","a",您仍然想要抑制第二个"a"。 幸运的是,使用GroupByUntil非常容易,它会为持续时间分组并发出每个组的第一个元素:

    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.GroupByUntil(k => k,
                                   _ => Observable.Timer(duration, scheduler))
                     .SelectMany(y => y.FirstAsync());
    }

如果您对方法名称有疑问-我首先想到的是Variation 2b; 我将名称保留为原样,以便单元测试仍然可以通过。它可能需要一个更好的名称,例如SuppressDuplicatesWithinWindow或类似的...

Variation 2a - “中间”不同的值会重置计时器

这稍微复杂一些-现在任何来自不同组的事件都将结束给定的组。我使用Publish().RefCount()组合来防止对源的多个订阅,并且必须非常小心处理null:

public static IObservable<T> DistinctUntilChanged<T>(
    this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
    if (scheduler == null) scheduler = Scheduler.Default;

    var sourcePub = source.Publish().RefCount();

    return sourcePub.GroupByUntil(
        k => k,
        x => Observable.Timer(duration, scheduler)
                       .TakeUntil(
                           sourcePub.Where(i => ReferenceEquals(null, i)
                                                ? !ReferenceEquals(null, x.Key)
                                                : !i.Equals(x.Key))))
        .SelectMany(y => y.FirstAsync());
}

Variation 2b

这是原始方法的变种,我把它加进来是因为我的对2a的改进使它更复杂了:

这是接受持续时间参数的Observable.DistinctUntilChanged的变体。给定一个事件,在该持续时间内连续的重复事件将被忽略。如果出现不同的事件或在该持续时间之外出现事件,则会发出该事件并重置抑制计时器。

它通过使用重载的DistinctUntilChanged(接受IEqualityComparer)来工作。比较器认为应用了TimeStamp的事件相等,如果值匹配并且时间戳在指定的持续时间内。

public static partial class ObservableExtensions
{
    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.Timestamp(scheduler)
                     .DistinctUntilChanged(new Comparer<T>(duration))
                     .Select(ts => ts.Value);
    }

    private class Comparer<T> : IEqualityComparer<Timestamped<T>>
    {
        private readonly TimeSpan _duration;

        public Comparer(TimeSpan duration)
        {
            _duration = duration;
        }

        public bool Equals(Timestamped<T> x, Timestamped<T> y)
        {
            if (y.Timestamp - x.Timestamp > _duration) return false;

            return ReferenceEquals(x.Value, y.Value)
                   && !ReferenceEquals(null,x.Value)
                   && x.Value.Equals(y.Value);
        }

        public int GetHashCode(Timestamped<T> obj)
        {
            if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
            return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
        }
    }
}

这是我使用的单元测试(包括 NuGet 包 Rx-Testing 和 NUnit):

public class TestDistinct : ReactiveTest
{
    [Test]
    public void DuplicateWithinDurationIsSupressed()
    {
        var scheduler = new TestScheduler();
        var source =scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void NonDuplicationWithinDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100,"a"),
            OnNext(200,"b"));
    }

    [Test]
    public void DuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "a"));
    }

    [Test]
    public void NonDuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "b"));
    }

    [Test]
    public void TestWithSeveralValues()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));
    }

    [Test]
    public void CanHandleNulls()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null),
            OnNext(700, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null));
    }

    [Test]
    public void TwoDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(150, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void TwoNullDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, (string)null),
            OnNext(150, (string)null),
            OnNext(200, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, (string)null));
    }
}

最后为了完整性- Variation 1将通过以下多值测试的变体:

    [Test]
    public void TestWithSeveralValuesVariation1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(900, "a"));
    }

而空值测试将更改为在末尾具有:

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(700, (string)null)); /* This line changes */

1
请注意,如果您的目标是连续丢弃重复项,则此方法非常有效。但是,如果您想要从例如a-b-a-b中快速连续地丢弃重复项,则不适用。我猜OP在这一点上有歧义。(我的参考点是“PropertyChanged”事件,您希望节流重复项,但重复项不一定连续出现。) - McGarnagle
添加了一个更简单的变体来涵盖 McGarnagle 的另一种解释(变体1)。 - James World
编辑 - 受变体1的启发,我找到了一个简化变体2的方法,它不需要比较器。 - James World
2
这真的很棒。我一直在寻找一个优雅的解决方案来解决类似的问题(最好采用您的“变体2”),现在终于找到了。如果我可以再次点赞,我会的。 - Kirill Shlenskiy
修改为变式2 - 区间内有多个重复项的情况不正确 - 添加了修复和几个测试以覆盖它,似乎无法避免进行棘手的空值检查... - James World

1

您正在寻找 Observable.Throttle

使用指定的源和 dueTime 忽略在另一个值之后到期时间的可观测序列中的值。

编辑

好的,所以上面只适用于节流序列中的所有元素,而不是按照 OP 的要求按键进行节流。我以为这将是下一步很容易实现的,但也许并不是那么简单? (F# 有一个 split 函数可能会有帮助,但显然没有 C# 等效函数。)

因此,这里尝试实现 Split

public static class Extension
{
    public static IDisposable SplitSubscribe<T, TKey>(
        this IObservable<T> source, 
        Func<T, TKey> keySelector, 
        Action<IObservable<TKey>> subscribe)
    {
        // maintain a list of Observables, one for each key (TKey)
        var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();

        // function to create a new Subject
        Func<TKey, Subject<TKey>> createSubject = key =>
        {
            Console.WriteLine("Added for " + key);
            var retval = new Subject<TKey>();
            subscribe(retval);
            retval.OnNext(key);
            return retval;
        };

        // function to update an existing Subject
        Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
        {
            Console.WriteLine("Updated for " + key);
            existing.OnNext(key);
            return existing;
        };

        return source.Subscribe(next =>
        {
            var key = keySelector(next);
            observables.AddOrUpdate(key, createSubject, updateSubject);
        });
        // TODO dispose of all subscribers
    }

    // special case: key selector is just the item pass-through
    public static IDisposable SplitSubscribe<T>(
        this IObservable<T> source, 
        Action<IObservable<T>> subscribe)
    {
        return source.SplitSubscribe(item => item, subscribe);
    }
}

使用此函数,您可以拆分源观察对象,然后对每个对象进行节流。用法如下:
IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();

dummyObservable.SplitSubscribe(next => 
    next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));

输出(原始顺序未保留)

添加到a
添加到b
更新a
更新b
更新b
添加到c
更新a
a
c
b

附注:节流也有问题,因为它也会引入延迟。 - James World

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接