Reactive Extensions超时未停止序列?

8

我正在尝试创建一个 IObservable<bool>,如果在最近5秒内收到UDP消息,则返回true;如果超时,则返回false

目前我的代码如下:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    var udp = BaseComms.UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Select(s => true);

    return Observable
        .Timeout(udp, TimeSpan.FromSeconds(5))
        .Catch(Observable.Return(false));
}

这样做存在以下问题:
- 一旦返回 false,序列就停止了。 - 我只需要在状态更改时得到 true 或 false。
我可以使用 Subject,但需要小心处理当没有订阅者时要释放 UDPBaseStringListener 可观察对象。
更新:
每次收到 UDP 消息时,我希望它返回 true。如果在过去的 5 秒内没有收到 UDP 消息,则希望它返回 false。

FYI,“Timeout”有一个重载,它接受一个替代的可观察对象,用于当超时发生而不是“抛出”和需要“Catch”的情况。 - Gideon Engelberth
1
读者可能也会对以下内容感兴趣:12,以及3 - Whymarrh
5个回答

5
正如Bj0所指出的那样,使用BufferWithTime解决方案不会在接收到数据点时立即返回数据点,并且在接收到数据点后不会重置缓冲区超时时间。
使用Rx Extensions 2.0,您可以使用新的Buffer重载解决这两个问题,它接受超时和大小参数。
static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return BaseComms
        .UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Buffer(TimeSpan.FromSeconds(5), 1)
        .Select(s => s.Count > 0)
        .DistinctUntilChanged();
}

3

缓冲区的问题在于,当你得到一个新的值时,"超时"间隔并没有被重置,缓冲窗口只是时间片段(在这种情况下为5秒),它们相互跟随。这意味着,根据你何时收到最后一个值,你可能需要等待几乎两倍的超时值。这也可能会错过超时。

               should timeout here
                         v
0s         5s         10s        15s
|x - x - x | x - - - - | - - - x -| ...
          true        true       true

然而,IObservable.Throttle 每次接收到新值时都会复位,并且只有在时间间隔已过(最后一个输入值)之后才会产生一个值。这可以用作超时,并与 IObservable 合并以将“超时”值插入流中:

var obs = BaseComms.UDPBaseStringListener(localEP)
            .Where(msg => msg.Data.Contains("running"));

return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
                        .Select( x => false ) )
            .DistinctUntilChanged();

一个可行的LINQPad示例:
var sub = new Subject<int>();

var script = sub.Timestamp()
    .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
    .Subscribe( x => 
{
    x.Dump("val");
});


Thread.Sleep(1000);

sub.OnNext(1);
sub.OnNext(2);

Thread.Sleep(10000);

sub.OnNext(5);

在等待2秒后,会在数据流中插入一个值为-1的数据。


不幸的是,如果您希望在超时期间未收到任何值时使可观察对象发出值,则此方法无法实现;只有当被观察的可观察对象实际上发出值时,才会发出超时响应。如果可观察对象停止发出值,则此方法将无效! - somethingRandom

1

如果您不想让序列停止,只需将其包装在Defer + Repeat中:

Observable.Defer(() => GettingUDPMessages(endpoint)
    .Repeat();

1

我建议避免使用Timeout - 它会引起异常,而编写带有异常的代码是不好的。

此外,似乎只有在一个值后你的可观察对象才停止。你可能需要更详细地解释你想要的行为。

我目前针对你的问题的解决方案是:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return Observable.Create<bool>(o =>
    {
        var subject = new AsyncSubject<bool>();
        return new CompositeDisposable(
            Observable.Amb(
                BaseComms
                    .UDPBaseStringListener(localEP)
                    .Where(msg => msg.Data.Contains("running"))
                    .Select(s => true),
                Observable
                    .Timer(TimeSpan.FromMilliseconds(10.0))
                    .Select(_ => false)
            ).Take(1).Subscribe(subject), subject.Subscribe(o));
    });
}

这有帮助吗?


默认情况下,“超时”只会引发异常。它有一个重载,如果达到超时时间,则使用替代的可观察对象,而默认的重载则调用Observable.Throw作为替代。 - Gideon Engelberth
@Jim - 你一定在使用不同版本的Rx - 它在v1.1.10621上编译得非常好。 - Enigmativity
你说得对。我正在使用v1.0.2856版。最新的稳定版本是v1.0.10621。最新的实验版本是v1.1.11111。 - Tim

0

根据这里的答案,我创建了以下扩展方法:

/// <summary>
/// Emits a value every interval until a value is emitted on the TakeUntil observable. The TakeUntil and Repeat
/// operators work together to create a resetting timer that is reset every time a value is emitted on the
/// TakeUntil observable.
/// </summary>
/// <param name="observable">The observable to observe for timeouts</param>
/// <param name="timeout">The value to use to emit a timeout value</param>
/// <param name="timeoutValue">The value to emit when the observable times out</param>
public static IObservable<T> SelectTimeout<T>(this IObservable<T> observable, TimeSpan timeout, T timeoutValue)
{
    IObservable<T> timeoutObservable = Observable
        .Interval(timeout)
        .Select(_ => timeoutValue)
        .TakeUntil(observable)
        .Repeat();

    return observable.Merge(timeoutObservable);
}

你需要以下的 using 指令:
using System;
using System.Reactive.Linq;

请注意,超时值在每次超时后都会发出。如果您希望仅发出一个超时值,则可以使用DistinctUntilChanged方法或查看链接的答案。我更喜欢不断发出超时信号,因为它更简单易懂。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接