使用RX Observable创建“防抖”效果

4

我有一个FTP客户端,希望保持连接到FTP服务器,除非(比如)一分钟内没有任何活动。我想使用Observable来实现这个目标。

以下是一个非常简单的Linqpad脚本,演示了这个概念:

async Task Main()
{
    var client = new Client();
    client.Connect();

    var debounce = new Subject<int>();
    debounce
        .Throttle(TimeSpan.FromSeconds(1))
        .Subscribe(eventNumber => client.Disconnect(eventNumber));

    // Something uses the FTP client
    debounce.OnNext(1);
    await Task.Delay(200);

    // Something else uses the FTP client
    debounce.OnNext(2);
    await Task.Delay(300);

    // No activity, the client will disconnect
    await Task.Delay(1000);
}

public class Client
{
    public void Connect() => Console.WriteLine("Connected");
    public void Disconnect(int eventNumber) => Console.WriteLine($"Disconnected: {eventNumber}");
}

这个方案很完美 - 在事件“2”之后,客户端会断开连接。
问题:有没有更好的方法来实现这个功能?或者更准确地说,有没有更好的方法不使用Subject来实现?
编辑:
下面是这个类的更加详细的版本 - 实际上,它订阅了一个可观察对象,该对象会告诉它需要下载的一些文件;如果在一段时间内没有传输任何文件,则希望客户端断开连接。
public class MyClassThatDownloadsViaFtp
{
    private IObserver<Unit> _debouncer;
    private FtpClient _client;

    public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
    {
        filesToDownloadViaFtp.Subscribe(DownloadFileViaFtp);

        // Disconnect after a minute of activity
        _debouncer = new Subject<Unit>();
        _debouncer
            .Throttle(TimeSpan.FromMinutes(1))
            .Subscribe(_ => DisconnectFtpClient());
    }

    public void DownloadFileViaFtp(FileToDownload file)
    {
        if (_client == null) _client = ConnectFtpClient();

        // Signal that the client is doing some work to prevent disconnect
        _debouncer.OnNext(Unit.Default);
        _client.Download(file.PathOnFtpServer);
    }

    // implementation irrelivent
    private FtpClient ConnectFtpClient() => new FtpClient();
    private FtpClient DisconnectFtpClient() => _client = null;
}

我发现由于我有一个源流,通过对其进行限流来达到相同的效果可能更容易(如下所示); 然而,在我没有可以限流的源流的情况下,我仍然想知道最佳方法是什么。

public class MyClassThatDownloadsViaFtp 
{
    private FtpClient _client;

    public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
    {
        filesToDownloadViaFtp
            .Select(DownloadFileViaFtp)
            .Throttle(TimeSpan.FromMinutes(1))
            .Subscribe(_ => DisconnectFtpClient());
    }

    public Unit DownloadFileViaFtp(FileToDownload file)
    {
        if (_client == null) _client = ConnectFtpClient();
        _client.Download(file.PathOnFtpServer);

        return Unit.Default;
    }

    // implementation irrelivent
    private FtpClient ConnectFtpClient() => new FtpClient();
    private FtpClient DisconnectFtpClient() => _client = null; 
}

客户端上的活动是什么样子?您需要某种事件(代替主题)来使客户端保持活动状态。使用Observable.Amb运算符很容易实现您想要的功能,但我需要知道用什么来触发扩展租约。 - Enigmativity
我已经添加了更多的示例,展示了我想要实现的更好的例子,希望这有所帮助。在这种情况下,客户端正在做什么是不相关的(我认为!)-- 更多地涉及到它被使用的程度。我对 observables 还很陌生,所以我的理解可能有偏差! - gerrod
客户端需要发出信号表明它仍然处于活动状态。因此这并不是无关紧要的。 - Enigmativity
另外,这个FtpClient是什么?它和Client一样吗?为什么_client = null是断开客户端连接的方法?这不像是真正的代码。 - Enigmativity
你说得对,这不是真正的代码,但它是真正代码的抽象。如果客户端有东西要下载(通过 filesToDownloadViaFtp 可观察对象),则客户端处于活动状态。下载速度非常快(<1秒),断开连接的超时时间设置为1分钟,因此客户端本身不需要发出仍然活动的信号。如果下载队列中1分钟内没有任何内容,则可以安全地假定客户端不再处于活动状态。 - gerrod
1个回答

2
您基本上已经回答了您的问题:
public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
{
    filesToDownloadViaFtp
        .Select(DownloadFileViaFtp)
        .Throttle(TimeSpan.FromMinutes(1))
        .Subscribe(_ => DisconnectFtpClient());
}

如果您没有像filesToDownloadViaFtp这样方便的流,则可以使用Observable.CreateObservable.FromEvent,或Observable.FromEventPattern等创建一个。
一个小问题是:Select最好不要有副作用,而DownloadFileViaFtp非常具有副作用。副作用最好在Subscribe调用中执行。

嗯,很有道理,我应该订阅两次(一次用于下载,一次用于限速)...谢谢建议! - gerrod

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接