取消一个长时间运行的Rx流

3

如果我有一个具有长时间运行操作的流,例如:

inputStream.Select(n => Task.Run(() =>
{
    // Long running operation
    Thread.Sleep(TimeSpan.FromSeconds(5));

    return n * n;
}).ToObservable())
.Switch()
.Subscribe(result => 
{
    // Use result in some way
    Console.WriteLine(result);
});

我该如何在Task.Run调用中获取一个CancellationToken,以便当Switch取消正在进行的计算的订阅时,将CancellationToken设置为已取消状态,以便我知道要中止计算。

1个回答

3
你可以使用 Observable.StartAsync 方法,例如:
inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
    if (token.IsCancellationRequested)
    {
        // .. don't need to do anything
        return 0;
    }
    else
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        return n * n;

    }                   
}))))
.Switch()
.Subscribe(Console.WriteLine);

或者,如果您将要生成多个值,则可以使用与Task一起工作的Observable.Create重载来获取CancellationToken。例如:

inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
    while (!token.IsCancellationRequested)
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        observer.OnNext(n * n);
    }

    observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);

在您的任务内,您需要调用OnNext来生成值。如果有返回值,则会被忽略。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接