响应式扩展计时器/间隔重置

4

我有一个项目,需要每10秒发送一次状态消息,除非在此期间已经更新。也就是说,每当有更新时,计时器就会重置。

var res = Observable
  .Interval(TimeSpan.FromSeconds(10))
  .Where(_ => condition);

res.Subscribe(_ => Console.WriteLine("Status sent."));

现在我知道 "Where" 只会在计时器结束后应用,所以它没有任何帮助。但是,我想知道是否有一种方法可以重置时间间隔;或者使用一个带有重复的 Timer()。
3个回答

9

使用标准Rx操作符实现这很容易。

从您的问题中不清楚更新的确切含义。我假设您拥有某种可观察对象,它会在每次更新时触发,或者您可以创建一个称为.OnNext(...)的主题,在有更新时调用该主题。没有可观察的更新,很难知道何时重置计时器。

所以这就是代码:

var update = new Subject<bool>();

var res =
    update
        .Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0)))
        .Switch();

res
    .Subscribe(_ => Console.WriteLine("Status sent."));

update.OnNext(true);
res查询现在会等待直到从update获取值,然后它会选择一个新的Observable.Interval。这意味着在Select之后,类型是IObservable<IObservable<long>>,所以需要使用.Switch()将其转换为IObservable<long>.Switch()通过仅传递最新观察到的可观察对象的值并处理任何先前的可观察对象来实现此目的。换句话说,对于每个update,都会启动一个新的计时器并取消先前的计时器。这意味着如果更新频率高于10秒,则计时器永远不会触发。

现在,如果res可观察对象本身就是一个更新,则可以执行以下操作:

res
    .Subscribe(_ =>
    {
        update.OnNext(true);
        Console.WriteLine("Status sent.");
    });

没问题 - 它仍然可以工作,但是对于每个计时器触发,res 将创建一个新的计时器。这意味着任何依赖于你的 update 可观测/主题的东西仍将正确地运行。


1
非常干净的实现。我将其用于依赖于Redux存储(即流)来触发注销的“注销”计时器,如果状态在一段时间内没有更改,则会注销。这种类型的东西使得使用Rx变得非常有趣。 - Sam Storie

4
我有一个小助手方法,一直跟着我:
```

我保留这个小助手方法:

```
public static IObservable<long> CreateAutoResetInterval<TSource>(IObservable<TSource> resetter, TimeSpan timeSpan, bool immediate = false)
{
    return resetter.Select(_ => immediate ? Observable.Interval(timeSpan).StartWith(0) : Observable.Interval(timeSpan)).Switch();
}

基本上,这与Enigmativity的答案机制相同。

0

我认为在这里使用Throttle也是可行的。Throttle的目的不是让元素在给定的时间段内通过,如果接收到另一个元素,则不允许其通过。因此,在您的情况下,如果在10秒内收到更新消息,则不发送状态。请参见以下单元测试,其中将200个滴答作为限制期。

[TestMethod]
    public void Publish_Status_If_Nothing_Receieved()
    {
        //Arrange
        var scheduler = new TestScheduler();
        var statusObserver = scheduler.CreateObserver<Unit>();
        var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3),
            OnNext(700, 4));

        var waitTime = TimeSpan.FromTicks(200);

        //Act
        updateStream.Throttle(waitTime, scheduler)
            .Select(_ => Unit.Default)
            .Subscribe(statusObserver);

        //Verify no status received
        scheduler.AdvanceTo(100);
        Assert.AreEqual(0, statusObserver.Messages.Count);

        //Verify no status received
        scheduler.AdvanceTo(200);
        Assert.AreEqual(0, statusObserver.Messages.Count);

        //Assert status received
        scheduler.AdvanceTo(400);
        statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));

        //Verify no more status received
        scheduler.AdvanceTo(700);
        statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接