如何使用RX实现超时的顺序?

4
场景如下:如果设备在短时间内向服务器发出回调,则认为该设备正在通信中。我希望创建一个类来封装这种状态跟踪的功能。当调用设备时,超时应该被重置。回调时,确认连接,并将状态设置为true;如果回调超时,则应将其设置为false。但是下一次调用应该能够重新设置超时时间,而不考虑当前的状态。
我想使用RX和switch以及timeout来实现这个功能。但我不知道为什么它停止工作了。
public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}

这是一个“测试用例”:
var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!

我假设内部可观察对象的超时也会停止外部可观察对象。因为outer => lambda表达式不再被调用。正确的方式是什么?
谢谢。
2个回答

5
以下是一种不使用.TimeOut的替代方法来生成IsConnected值流的方式:
public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}
Observable.Amb 操作符会从先产生值的任意一个可观测对象中取值,这比使用异常处理更好。

4
问题在于Timeout实际上会引发异常,导致Rx订阅失败。一旦触发超时(按照代码编码的方式),将不会发送任何其他通知。Rx语法规定,*可以有OnNext消息,然后跟随一个OnCompleted或一个OnErrorTimeout发出的OnError之后,就不会再接收到任何消息。

您需要通过OnNext消息而不是OnError消息来传送超时消息。在旧代码中,您将任何OnError转换为false,并将任何OnNext转换为true。相反,您需要将正确的新IsConnected值嵌入到OnNext消息中。以下是如何操作:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}

1
谢谢。我已经测试了你的建议,发现需要在 Select 后面加上 .FirstOrDefaultAsync()。如果没有它,即使 connected 已经提交了值,超时仍然会被触发。与此同时,我已经找到了另一个解决方案:pending.Select(_ => connected.Buffer(TimeSpan.FromSeconds(timeoutSeconds), 1).FirstOrDefaultAsync()) .Switch().Subscribe(l => IsConnected = l.Count > 0, token); - ZorgoZ
你在什么情况下发现需要使用 FirstOrDefaultAsync - Shlomo
c.SetPending(); await Task.Delay(TimeSpan.FromSeconds(5)); c.ConfirmConnected(); c.IsConnected.Dump() /TRUE, OK/; await Task.Delay(TimeSpan.FromSeconds(20)); c.IsConnected.Dump(); 后者应该仍然是TRUE,但是没有使用FirstOrDefaultAsync时却是FALSE。 - ZorgoZ
修正了答案以解决那个错误,还有可能存在的另一个错误(异常不会被传播)。你提供的备选方案也很好。 - Shlomo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接