如何使用CancellationToken链接异步任务?

3
我希望能够在前提条件满足的情况下将一些任务链接起来,如果我使用的 CancellationToken 没有被触发,就继续执行。我的目标是实现类似于以下代码的效果:
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    if (cancellationToken.IsCancellationRequested) return;
    await t1();
    if (cancellationToken.IsCancellationRequested) return;
    await t2();
    if (cancellationToken.IsCancellationRequested) return;
    await t3();
    if (cancellationToken.IsCancellationRequested) return;
    await t4();
});
var timeout = Task.Delay(TimeSpan.FromSeconds(4));
var completedTask = await Task.WhenAny(t, timeout);
if (completedTask != t)
{
    cts.Cancel();
    await t;
}

我目前的内容是这样的,虽然有点啰嗦,但它能够工作。

6个回答

1
var cts = new CancellationTokenSource();
var t = Task.Run(async () => {
     await t1();
     await t2();
     await t3();
     await t4();
 }, cts.Token);

cts.CancelAfter(TimeSpan.FromSeconds(4));

try
{
     await t;
}
catch (OperationCanceledException)
{
     // The cancellation token was triggered, add logic for that
     ....
}

3
CancellationToken作为Task.Run的参数传递,只有在任务仍然被调度且尚未启动时才会取消该任务。因此,您的代码将允许所有任务运行或者不允许任何任务运行。这不是OP想要的。 - Theodor Zoulias
1
抱歉,伙计,这不是我想要的。 - Tiago Dall'Oca

1

你的原始代码基本正确,它假设你总是希望单个任务运行完成,如果取消则希望整个任务成功完成。但这些做法都不是惯用的。

更常见的做法可能是:

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    cancellationToken.ThrowIfCancellationRequested();
    await t1(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t2(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t3(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t4(cancellationToken);
}, cancellationToken);

然后,在其他地方:

cts.Cancel();

在这里,您可以省略对ThrowIfCancellationRequested的调用,假设各个任务在进入后不久就会检查它,但核心思想是,您应该将令牌传递到执行工作的最内部循环中,它们应该通过调用它来取消抛出异常,从而将任务设置为取消状态而不是成功状态。
(因此,如果遇到不接受CancellationToken参数的函数,则只需要“实际”调用ThrowIfCancellationRequested -- 这就是为什么所有异步方法都应该这样做,否则其任务将是不可取消的。)

你的解决方案可能会向调用者传播一个令人惊讶的“OperationCancelledException”异常。只有在调用者是取消的发起者时,此异常才适用。但这里并非如此。取消是由操作的内部工作引起的。我认为,传播超时发生的信息的正确方法是抛出TimeoutException异常。不过,我同意将令牌传递到各个异步方法中。 - Theodor Zoulias
@TheodorZoulias 是的,如果保留基于时间的取消,则整个操作应该被封装并转换为TimeoutException异常。然而,从原始问题的背景来看,超时并不是实际上取消操作的预期方式,它只是被用作示例。我只是按照那个例子进行操作。 - Miral
在OP的例子中,没有抛出任何异常。如果超时,整个任务将成功完成。你声称这是非惯用语,这是我不同意你的答案的另一点(我认为这是惯用语和不良实践)。 - Theodor Zoulias
这就是为什么我说它应该通过取消来完成整个任务,我们都同意这比成功更好(这是当前被接受的答案)。再次强调,我假设“按超时取消”这个具体示例不是最终预期的取消方法。因此,无论是谁取消了任务,任务都已经被取消,这应该通知任何观察任务的人——因此不会让人感到惊讶。 - Miral
我认为取消操作比成功操作更好,就像受伤比死亡更好一样。但两者都是不好的做法。它们都不能真正传达发生的情况,即“超时”。 - Theodor Zoulias
好的,我已经删除了超时部分,因为它并不相关。 - Miral

0

看起来你的目标只是在整个操作超过4秒时停止执行。

如果你将CancellationToken传递给你的t1/t2/等方法,我会说你做得已经很好了。但是由于你没有这样做,你可以使用Stopwatch而不是CancellationToken

var timeout = TimeSpan.FromSeconds(4);
var stopwatch = new Stopwatch();
stopwatch.Start();

await t1();
if (stopwatch.Elapsed > timeout) return;
await t2();
if (stopwatch.Elapsed > timeout) return;
await t3();
if (stopwatch.Elapsed > timeout) return;
await t4();
stopwatch.Stop();

我假设这在一个方法中,你可以使用return,但如有需要可以进行修改(返回值、抛出异常等)。

在这种情况下,对我来说使用 CancellationToken 是必须的。 - Tiago Dall'Oca
@TiagoDall'Oca 为什么?这个代码会被其他需要传递 CancellationToken 的代码调用吗? - Gabriel Luci
是的。这是我们在开发一些允许取消操作的功能时可能采用的某种模式。这里的超时实际上只是一个特定的使用情况。 - Tiago Dall'Oca

0

你的代码在功能上是没问题的,但是一眼看过去并不清楚它在做什么。因此,我建议你将这个逻辑封装在一个带有描述性名称和参数的实用方法中:

public static async Task RunSequentially(IEnumerable<Func<Task>> taskFactories,
    int timeout = Timeout.Infinite, bool onTimeoutAwaitIncompleteTask = false)
{
    using (var cts = new CancellationTokenSource(timeout))
    {
        if (onTimeoutAwaitIncompleteTask)
        {
            await Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    if (cts.IsCancellationRequested) throw new TimeoutException();
                    await taskFactory();
                }
            });
        }
        else // On timeout return immediately
        {
            var allSequentially = Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    var task = taskFactory(); // Synchronous part of task
                    cts.Token.ThrowIfCancellationRequested();
                    await task; // Asynchronous part of task
                }
            }, cts.Token);
            var timeoutTask = new Task(() => {}, cts.Token);
            var completedTask = await Task.WhenAny(allSequentially, timeoutTask);
            if (completedTask.IsCanceled) throw new TimeoutException();
            await completedTask; // Propagate any exception
        }
    }
}

这段代码与你的不同之处在于它会在超时时抛出一个TimeoutException异常。我认为强制调用者显式处理此异常比隐藏操作超时的事实更好。调用者可以通过留空catch块来忽略异常:

try
{
    await RunSequentially(new[] { t1, t2, t3, t4 },
        timeout: 4000,
        onTimeoutAwaitIncompleteTask: true);
}
catch (TimeoutException)
{
    // Do nothing
}

你在这里做的似乎有点过度,但我喜欢将任务放入集合的想法。 - Tiago Dall'Oca
@TiagoDall'Oca 我没有使用集合,因为每个Task都是单独等待的,而且没有必要跟踪已完成的Task。如果我必须处理Task<TResult>并将这些结果返回给调用者,我可以使用集合来保存Task结果。在C# 8中,甚至不需要这样做,因为我可以返回一个IAsyncEnumerable<TResult>,并逐个将结果发送给调用者。 - Theodor Zoulias

0
你应该考虑使用微软的响应式框架(也称为Rx)- NuGet System.Reactive并添加using System.Reactive.Linq; - 然后你就可以这样做:
IObservable<Unit> tasks =
    from x1 in Observable.FromAsync(() => t1())
    from x2 in Observable.FromAsync(() => t2())
    from x3 in Observable.FromAsync(() => t3())
    from x4 in Observable.FromAsync(() => t4())
    select Unit.Default;

IObservable<Unit> timer =
    Observable
        .Timer(TimeSpan.FromSeconds(4.0))
        .Select(x => Unit.Default)

IDisposable subscription =
    Observable
        .Amb(tasks, timer)
        .Subscribe();

如果计时器可观察对象在任务完成之前触发,则整个管道将被取消。不会运行任何不必要的任务。
如果您想手动取消,则只需调用subscription.Dispose()
代码也很简单美观。

是的,對我來說看起來相當不錯,但我真的在尋找一種在我的特定情況下使用 CancellationToken 的方法。儘管如此,這是非常整潔的程式碼!謝謝 - Tiago Dall'Oca
@TiagoDall'Oca - 为什么需要 CancellationToken?你在问题中展示的任务没有任何可以取消的内容。无论如何,使用 Rx 和 CancellationToken 是可能的。我需要了解更多关于你正在做的事情的信息。 - Enigmativity

-1

既然没有人给出更好的答案,那我自己来回答吧。

我的代码是我在问题中提供的,在我的看法中,它是最具连贯性的,并且足够通用,可以在其他情况下重复使用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接