如何等待任务数组并在第一个异常时停止等待?

17

我有一个任务数组,并使用 Task.WhenAll 等待这些任务。我的任务经常失败,如果发生故障,我会弹出一个消息框通知用户以便重试。我的问题是报告错误直到所有任务完成才被延迟。相反,我希望在第一个任务引发异常时立即通知用户。换句话说,我想要一个快速失败的版本 Task.WhenAll。由于没有内置的此类方法,我尝试制作自己的方法,但我的实现不符合我的预期。以下是我的解决方案:

public static async Task<TResult[]> WhenAllFailFast<TResult>(
    params Task<TResult>[] tasks)
{
    foreach (var task in tasks)
    {
        await task.ConfigureAwait(false);
    }
    return await Task.WhenAll(tasks).ConfigureAwait(false);
}

这通常比本机的Task.WhenAll更快,但通常速度还不够快。故障任务#2在任务#1完成之前不会被观察到。我应该如何改进它,以使其尽快失败?


更新:关于取消,当前不在我的要求范围内,但假设为了一致性,第一个取消的任务应该立即停止等待。在这种情况下,从WhenAllFailFast返回的组合任务应该具有Status == TaskStatus.Canceled

澄清:取消场景是用户点击“取消”按钮停止任务完成,而不是在出现异常时自动取消未完成的任务。


1
WhenAny或WaitAny可能是一个不错的选择。它会在任何任务完成时执行。然后,您可以决定重新排队"WhenAny",继续使用WhenAll代码或在异常情况下全部取消。| 编辑:看起来stannius在我完成想法之前就有了这个代码。 - Christopher
2
当一个任务失败时,您希望其他任务发生什么?您希望其他任务继续工作还是希望中止其他任务? - Peter Bons
1
这些任务是否接受取消标记或者它们能否被修改以接受一个标记,并且尊重它? - Peter Bons
1
@PeterBons,目前我的任务不支持取消。我正在使用一个带有异步方法的库,这些方法不接受“CancellationToken”,因此实际上我无法取消它们。 - Theodor Zoulias
1
@JSteward 这个duplicate包含一个复杂的递归方法WhenAllError,需要一个CancellationToken作为参数,并且无法编译。删除与CancellationToken相关的代码会导致该方法陷入无限循环。这显然不是解决我的问题的方法。 - Theodor Zoulias
显示剩余5条评论
4个回答

9
你最好使用TaskCompletionSource构建WhenAllFailFast方法。你可以对每个输入任务使用.ContinueWith()同步继续,当任务以故障状态结束时(使用相同的异常对象)错误TCS。也许像这样(未经充分测试):
using System;
using System.Threading;
using System.Threading.Tasks;

namespace stackoverflow
{
    class Program
    {
        static async Task Main(string[] args)
        {

            var cts = new CancellationTokenSource();
            cts.Cancel();
            var arr = await WhenAllFastFail(
                Task.FromResult(42),
                Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")),
                Task.FromCanceled<int>(cts.Token));

            Console.WriteLine("Hello World!");
        }

        public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
        {
            if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());

            // defensive copy.
            var defensive = tasks.Clone() as Task<TResult>[];

            var tcs = new TaskCompletionSource<TResult[]>();
            var remaining = defensive.Length;

            Action<Task> check = t =>
            {
                switch (t.Status)
                {
                    case TaskStatus.Faulted:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetException(t.Exception.InnerException);
                        break;
                    case TaskStatus.Canceled:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetCanceled();
                        break;
                    default:

                        // we can safely set here as no other task remains to run.
                        if (Interlocked.Decrement(ref remaining) == 0)
                        {
                            // get the results into an array.
                            var results = new TResult[defensive.Length];
                            for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
                            tcs.SetResult(results);
                        }
                        break;
                }
            };

            foreach (var task in defensive)
            {
                task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
            }

            return tcs.Task;
        }
    }
}

编辑:展开AggregateException异常,支持取消操作,返回结果数组。防止数组变异、null或空数组。明确指定TaskScheduler。


我更新了我的问题,并提出了有关取消的要求。简而言之,它遵循快速失败原则。 - Theodor Zoulias
这个程序运行得相当不错,但有两个问题:1)如果失败了,我会得到一个“AggregateException”。我更希望直接获得特定的异常,就像我在等待“Task.WhenAll”时所做的那样。2)它不支持返回已完成任务的结果。偶尔,所有我的任务都成功完成,然后我确实需要结果来处理它们! - Theodor Zoulias
2
我建议在继续执行时使用await,例如在本地的async方法中。如果您确实要使用ContinueWith,则应该传递一个TaskScheduler - Stephen Cleary
@StephenCleary,啊!有趣的陷阱,谢谢你的提示。 - ZaldronGG
现在很好,谢谢!我明天会决定接受哪个答案。 - Theodor Zoulias
显示剩余4条评论

8

我最近再次需要使用WhenAllFailFast方法,并修改了@ZaldronGG的优秀解决方案,使其更加高效(并且更符合Stephen Cleary的建议)。下面的实现在我的PC上每秒处理大约350万个任务。

public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    if (tasks is null) throw new ArgumentNullException(nameof(tasks));
    if (tasks.Length == 0) return Task.FromResult(new TResult[0]);

    var results = new TResult[tasks.Length];
    var remaining = tasks.Length;
    var tcs = new TaskCompletionSource<TResult[]>(
        TaskCreationOptions.RunContinuationsAsynchronously);

    for (int i = 0; i < tasks.Length; i++)
    {
        var task = tasks[i];
        if (task == null) throw new ArgumentException(
            $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
        HandleCompletion(task, i);
    }
    return tcs.Task;

    async void HandleCompletion(Task<TResult> task, int index)
    {
        try
        {
            var result = await task.ConfigureAwait(false);
            results[index] = result;
            if (Interlocked.Decrement(ref remaining) == 0)
            {
                tcs.TrySetResult(results);
            }
        }
        catch (OperationCanceledException)
        {
            tcs.TrySetCanceled();
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }
    }
}

1
有趣的使用异步 void。惊人的性能提升! - ZaldronGG

2
你的循环在伪串行中等待每个任务,这就是为什么它在检查任务2失败之前等待任务1完成的原因。
你可能会发现这篇文章对于在第一次失败后中止的模式有所帮助:http://gigi.nullneuron.net/gigilabs/patterns-for-asynchronous-composite-tasks-in-c/。最初的回答
    public static async Task<TResult[]> WhenAllFailFast<TResult>(
        params Task<TResult>[] tasks)
    {
        var taskList = tasks.ToList();
        while (taskList.Count > 0)
        {
            var task = await Task.WhenAny(taskList).ConfigureAwait(false);
            if(task.Exception != null)
            {
                // Left as an exercise for the reader: 
                // properly unwrap the AggregateException; 
                // handle the exception(s);
                // cancel the other running tasks.
                throw task.Exception.InnerException;           
            }

            taskList.Remove(task);
        }
        return await Task.WhenAll(tasks).ConfigureAwait(false);
     }

我用你的代码替换了WhenAllFailFast中的代码,但是它无法编译。你能提供一个完整的实现吗?我认为你的方法非常有前途! - Theodor Zoulias
我刚刚测试了一下,不起作用。你的 WhenAllFailFast 方法会等待所有任务完成,就像原生的 Task.WhenAll 一样。 - Theodor Zoulias
@TheodorZoulias 好的,我添加了几行代码,你需要在单个任务上检查异常。 - stannius
现在它运行得非常好!但是它不能很好地处理我的取消更新要求,不过这并不是什么大问题,因为目前我没有取消任务。 - Theodor Zoulias
为了取消其他任务,它们都必须接受并遵守取消令牌。这不是一件简单的事情(我已经做过了),而且你对取消的要求似乎有些松散。但我添加了另外几行代码来展示你应该在哪里进行操作。 - stannius

1
我在这个问题上添加了一个答案,不是因为我找到了更快的解决方案,而是因为我对在未知的同步上下文中启动多个异步无返回值操作有些怀疑。我在这里提出的解决方案要慢得多。它比@ZaldronGG的优秀解决方案慢大约3倍,比我之前基于异步无返回值的实现慢大约10倍。然而,它的优点是,在返回的Task<TResult[]>完成后,不会泄漏附加在观察到的任务上的遗忘继续。当此任务完成时,由WhenAllFailFast方法内部创建的所有继续都已清理。这是API通常希望的行为,但在许多情况下可能并不重要。
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);
    CancellationTokenSource cts = new();
    Task<TResult> failedTask = null;
    TaskContinuationOptions flags = TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously;
    Action<Task<TResult>> continuationAction = new(task =>
    {
        if (!task.IsCompletedSuccessfully)
            if (Interlocked.CompareExchange(ref failedTask, task, null) is null)
                cts.Cancel();
    });
    IEnumerable<Task> continuations = tasks.Select(task => task
        .ContinueWith(continuationAction, cts.Token, flags, TaskScheduler.Default));

    return Task.WhenAll(continuations).ContinueWith(allContinuations =>
    {
        cts.Dispose();
        Task<TResult> localFailedTask = Volatile.Read(ref failedTask);
        if (localFailedTask is not null)
            return Task.WhenAll(localFailedTask);
        // At this point all the tasks are completed successfully
        Debug.Assert(tasks.All(t => t.IsCompletedSuccessfully));
        Debug.Assert(allContinuations.IsCompletedSuccessfully);
        return Task.WhenAll(tasks);
    }, default, flags, TaskScheduler.Default).Unwrap();
}

这个实现与ZaldronGG的类似,它在每个任务上附加一个继续操作,不同之处在于这些继续操作是可取消的,并且当观察到第一个非成功的任务时,它们会被批量取消。它还使用了我最近发现的Unwrap技术,这消除了手动完成TaskCompletionSource<TResult[]>实例的需要,并且通常使得实现更简洁。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接