Rx和任务 - 当新任务产生时取消正在运行的任务?

7
我有一个用户交互场景,我想用 Rx 来处理。
该场景类似于经典的“当用户停止输入时,执行一些操作”(通常是搜索到目前为止用户已键入的内容) (1) - 但我还需要:
  • (2) 仅获取“执行某些操作”单元的最新结果(见下文)
  • (3) 当启动新的工作单元时,取消正在进行中的任何工作(在我的情况下,这需要CPU密集型操作)
(1) 中,我使用 IObservable 监听用户事件,并使用 .Throttle() 做节流处理,以便仅在事件之间的暂停期间触发(“用户停止输入”)。
然后,我使用 .Select(_ => CreateMyTask(...).ToObservable())
这给了我一个 IObservable<IObservable<T>>,其中每个内部可观察对象包装了一个单独的任务。
要实现 (2),我最终应用了 .Switch(),以仅获取最新工作单元的结果。
那么 (3) 怎么办? 取消待处理任务?
如果我理解正确,每当出现新的内部 IObservable<T> 时,.Switch() 方法就会订阅它并从先前的一个(或多个)中取消订阅,从而导致它们被 Dispose()
也许可以将其与触发取消任务的方法连接在一起?
2个回答

14

你可以使用 Observable.FromAsync,它将生成在观察者取消订阅时被取消的令牌:

input.Throttle(...)
     .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
     .Switch()
     .Subscribe(...);

这将为每个工作单元生成一个新的令牌,并在Switch切换到新令牌时取消它。


1
谢谢!我不知道这个工厂方法。由于它的取消支持,它似乎比ToObservable()运算符更可取。 - Lee Campbell
是的,我只是几周前才发现它。Rx 的问题在于缺乏强有力的文档支持。 - Brandon
@Brandon Lee的电子书就是文档 :) - Cristian Diaconescu

3

你是否需要处理任务?

如果你只想使用Observables,那么你可以很好地自己完成这个任务。

试着做这样的事情:

var query =
    Observable.Create<int>(o =>
    {
        var cancelling = false;
        var cancel = Disposable.Create(() =>
        {
            cancelling = true;
        });
        var subscription = Observable.Start(() =>
        {
            for (var i = 0; i < 100; i++)
            {
                Thread.Sleep(10); //1000 ms in total
                if (cancelling)
                {
                    Console.WriteLine("Cancelled on {0}", i);
                    return -1;
                }
            }
            Console.WriteLine("Done");
            return 42;
        }).Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });

这个observable在for循环中执行了一些艰苦的工作,其中包括 Thread.Sleep(10);,但是当observable被dispose时,循环退出并且CPU密集型工作停止。然后您可以使用标准的Rx DisposeSwitch来取消正在进行的工作。

如果您想将其封装成一个方法,请尝试以下代码:

public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
    return Observable.Create<T>(o =>
    {
        var cancelling = false;
        var cancel = Disposable
            .Create(() => cancelling = true);
        var subscription = Observable
            .Start(() => work(() => cancelling))
            .Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });
}

然后使用如下函数调用:

Func<Func<bool>, int> work = cancelling =>
{
    for (var i = 0; i < 100; i++)
    {
        Thread.Sleep(10); //1000 ms in total
        if (cancelling())
        {
            Console.WriteLine("Cancelled on {0}", i);
            return -1;
        }
    }
    Console.WriteLine("Done");
    return 42;
};

这是我证明它有效的代码:

var disposable =
    ObservableEx
        .Start(work)
        .Subscribe(x => Console.WriteLine(x));

Thread.Sleep(500);
disposable.Dispose();

我得到了“取消50”(有时是“取消51”)作为我的输出。

不,我没有使用Task的要求。只是感觉将处理密集型操作封装在一个任务中很自然。我会仔细看看你的解决方案 :) - Cristian Diaconescu
@CristiDiaconescu - 老实说,我能看出TPL的优点,但我总是发现使用Rx的解决方案更加整洁和表达力更强。我尽量避免使用TPL而选择Rx。 - Enigmativity
如果您正在使用Rx,您提出的“Start”方法可以更容易理解为“Start(Func <CancellationToken,T> work)”,并使用“CancellationDisposable”生成令牌,而不是自定义可处理项。或者,如果您真正拥抱Rx:Start(Func <IObservable <Unit>,T> work),其中您的“Start”方法给它一个“AsyncSubject”,当订阅被处理时,它会发出取消信号。 Func <bool>感觉很笨拙,不符合Rx的精神。 - Brandon
@Brandon - 我理解你的观点,但已经有了接受Func<bool>参数的Rx方法- DoWhileWhile。在这种情况下,Func<bool>是合适的,因为我们需要一种直接中断生成函数执行的方式。大多数其他Rx运算符都是原子性的- 无法中断函数。至少这是我的想法。 - Enigmativity
1
虽然我给其他答案点了赞,但我同意每次我尝试使用Task/TPL时,最终都会在Rx中找到更好的解决方案(可测试/可组合/可取消/易读)。 - Lee Campbell
@LeeCampbell 我同意,但我们在我们的代码库中发现Rx已经提供了足够的TPL接口,以便我们可以轻松地在TPL和Rx之间切换,以满足我们编写的任何部分的需求。我对Rx所付出的思考深感印象深刻。 - Brandon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接