我有一个 IObservable<String>
。
我正在尝试检测(并处理)短时间内相同的字符串被通知的情况。
我希望有一个过滤器/流/可观察对象,如果同样的字符串在250毫秒内多次通知,则仅通知一次。
不确定从哪里开始。
我有一个 IObservable<String>
。
我正在尝试检测(并处理)短时间内相同的字符串被通知的情况。
我希望有一个过滤器/流/可观察对象,如果同样的字符串在250毫秒内多次通知,则仅通知一次。
不确定从哪里开始。
这里有一个非常紧凑的解决方案。您的帖子对于持续时间是否会在出现不同值后立即重置有些模糊 - 因此,我提供了两个解决方案来解释这两个情况。
这是当您严格关注抑制持续时间并且不关心是否存在任何“中间”值(与McGarnagle的解决方案相同)时的情况 - 例如,如果您快速获取 "a","b","a"
,您仍然想要抑制第二个"a"
。 幸运的是,使用GroupByUntil
非常容易,它会为持续时间分组并发出每个组的第一个元素:
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.GroupByUntil(k => k,
_ => Observable.Timer(duration, scheduler))
.SelectMany(y => y.FirstAsync());
}
如果您对方法名称有疑问-我首先想到的是Variation 2b; 我将名称保留为原样,以便单元测试仍然可以通过。它可能需要一个更好的名称,例如SuppressDuplicatesWithinWindow
或类似的...
这稍微复杂一些-现在任何来自不同组的事件都将结束给定的组。我使用Publish().RefCount()组合来防止对源的多个订阅,并且必须非常小心处理null:
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
var sourcePub = source.Publish().RefCount();
return sourcePub.GroupByUntil(
k => k,
x => Observable.Timer(duration, scheduler)
.TakeUntil(
sourcePub.Where(i => ReferenceEquals(null, i)
? !ReferenceEquals(null, x.Key)
: !i.Equals(x.Key))))
.SelectMany(y => y.FirstAsync());
}
这是原始方法的变种,我把它加进来是因为我的对2a的改进使它更复杂了:
这是接受持续时间参数的Observable.DistinctUntilChanged
的变体。给定一个事件,在该持续时间内连续的重复事件将被忽略。如果出现不同的事件或在该持续时间之外出现事件,则会发出该事件并重置抑制计时器。
它通过使用重载的DistinctUntilChanged
(接受IEqualityComparer)来工作。比较器认为应用了TimeStamp的事件相等,如果值匹配并且时间戳在指定的持续时间内。
public static partial class ObservableExtensions
{
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.Timestamp(scheduler)
.DistinctUntilChanged(new Comparer<T>(duration))
.Select(ts => ts.Value);
}
private class Comparer<T> : IEqualityComparer<Timestamped<T>>
{
private readonly TimeSpan _duration;
public Comparer(TimeSpan duration)
{
_duration = duration;
}
public bool Equals(Timestamped<T> x, Timestamped<T> y)
{
if (y.Timestamp - x.Timestamp > _duration) return false;
return ReferenceEquals(x.Value, y.Value)
&& !ReferenceEquals(null,x.Value)
&& x.Value.Equals(y.Value);
}
public int GetHashCode(Timestamped<T> obj)
{
if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
}
}
}
这是我使用的单元测试(包括 NuGet 包 Rx-Testing 和 NUnit):
public class TestDistinct : ReactiveTest
{
[Test]
public void DuplicateWithinDurationIsSupressed()
{
var scheduler = new TestScheduler();
var source =scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void NonDuplicationWithinDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100,"a"),
OnNext(200,"b"));
}
[Test]
public void DuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "a"));
}
[Test]
public void NonDuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "b"));
}
[Test]
public void TestWithSeveralValues()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
}
[Test]
public void CanHandleNulls()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null),
OnNext(700, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null));
}
[Test]
public void TwoDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(150, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void TwoNullDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, (string)null),
OnNext(150, (string)null),
OnNext(200, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, (string)null));
}
}
最后为了完整性- Variation 1将通过以下多值测试的变体:
[Test]
public void TestWithSeveralValuesVariation1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(900, "a"));
}
而空值测试将更改为在末尾具有:
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(700, (string)null)); /* This line changes */
您正在寻找 Observable.Throttle
。
使用指定的源和 dueTime 忽略在另一个值之后到期时间的可观测序列中的值。
编辑
好的,所以上面只适用于节流序列中的所有元素,而不是按照 OP 的要求按键进行节流。我以为这将是下一步很容易实现的,但也许并不是那么简单? (F# 有一个 split
函数可能会有帮助,但显然没有 C# 等效函数。)
因此,这里尝试实现 Split
:
public static class Extension
{
public static IDisposable SplitSubscribe<T, TKey>(
this IObservable<T> source,
Func<T, TKey> keySelector,
Action<IObservable<TKey>> subscribe)
{
// maintain a list of Observables, one for each key (TKey)
var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();
// function to create a new Subject
Func<TKey, Subject<TKey>> createSubject = key =>
{
Console.WriteLine("Added for " + key);
var retval = new Subject<TKey>();
subscribe(retval);
retval.OnNext(key);
return retval;
};
// function to update an existing Subject
Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
{
Console.WriteLine("Updated for " + key);
existing.OnNext(key);
return existing;
};
return source.Subscribe(next =>
{
var key = keySelector(next);
observables.AddOrUpdate(key, createSubject, updateSubject);
});
// TODO dispose of all subscribers
}
// special case: key selector is just the item pass-through
public static IDisposable SplitSubscribe<T>(
this IObservable<T> source,
Action<IObservable<T>> subscribe)
{
return source.SplitSubscribe(item => item, subscribe);
}
}
IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();
dummyObservable.SplitSubscribe(next =>
next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));
输出(原始顺序未保留)
添加到a 添加到b 更新a 更新b 更新b 添加到c 更新a a c b