如果发生任何异常,如何使Task.WaitAll()中断?

33

我希望使Task.WaitAll()在任何正在运行的任务抛出异常时中止,以便我不必等待60秒钟才能完成。我如何实现这样的行为? 如果WaitAll()无法实现该功能,是否有其他C#特性或解决方法?

Task task1 = Task.Run(() => throw new InvalidOperationException());
Task task2 = ...
...
try
{
    Task.WaitAll(new Task[]{task1, task2, ...}, TimeSpan.FromSeconds(60));
}
catch (AggregateException)
{
    // If any exception thrown on any of the tasks, break out immediately instead of wait all the way to 60 seconds.
}

12
有一个给负评的人可以解释一下吗?这是一个合理的要求。 - usr
4个回答

20
以下内容应该可以做到不修改原始任务的代码(未经测试):
static bool WaitAll(Task[] tasks, int timeout, CancellationToken token)
{
    var cts = CancellationTokenSource.CreateLinkedTokenSource(token);

    var proxyTasks = tasks.Select(task => 
        task.ContinueWith(t => {
            if (t.IsFaulted) cts.Cancel();
            return t; 
        }, 
        cts.Token, 
        TaskContinuationOptions.ExecuteSynchronously, 
        TaskScheduler.Current).Unwrap());

    return Task.WaitAll(proxyTasks.ToArray(), timeout, cts.Token);
}

请注意,它只追踪错误的任务(即抛出异常的任务)。如果您还需要跟踪取消的任务,请进行以下更改:
if (t.IsFaulted || t.IsCancelled) cts.Cancel();
更新: 此处等待任务代理是多余的,正如评论中@svick所指出的那样。他提出了一种改进版本:https://gist.github.com/svick/9992598

4
你为什么要等待代理任务呢?等待原始任务不是更简单吗?(同时保持取消和连续性逻辑)像这样 - svick
@svick,感谢您的指点,这没有什么好的理由。这是一个遗留问题,在最初的版本中,我使用了cts = new CancellationTokenSource()而不是CreateLinkedTokenSource并将原始的token传递给Task.Wait,所以它需要代理。我正在更新答案。 - noseratio - open to work
@Noseratio,你为什么需要TaskContinuationOptions.ExecuteSynchronouslyTaskScheduler.Current?另外,代理任务是否必须传递cts.Token - Tarc
1
@Tarc,ExecuteSynchronouslyTaskScheduler.Current的作用是在相应的前置任务结束时使连续执行同步。Token被传递给代理任务,以允许在前置任务结束之前取消它。 - noseratio - open to work

0
我想建议对Noseratio上面的卓越答案进行轻微修改。在我的情况下,我需要保留原始抛出的异常,并在周围的try/catch中区分取消和异常状态。
public static void WaitUnlessFault( Task[] tasks, CancellationToken token )
{
    var cts = CancellationTokenSource.CreateLinkedTokenSource(token);

    foreach ( var task in tasks ) {
        task.ContinueWith(t =>
        {
            if ( t.IsFaulted ) cts.Cancel();
        },
        cts.Token,
        TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Current);
    }

    try {
        Task.WaitAll(tasks, cts.Token);
    }
    catch ( OperationCanceledException ex ) {
        var faultedTaskEx = tasks.Where(t => t.IsFaulted)
            .Select(t => t.Exception)
            .FirstOrDefault();

        if ( faultedTaskEx != null )
            throw faultedTaskEx;
        else
            throw;
    }
}

0
一种实现这个的方法是使用CancellationTokenSource。您可以创建一个cancellationtokensource,并将其作为参数传递给Task.WaitAll。思路是将您的任务包装在try/catch块中,在出现异常的情况下,调用cancellationtokensource上的cancel。
以下是示例代码:
CancellationTokenSource mainCancellationTokenSource = new CancellationTokenSource();

                Task task1 = new Task(() =>
                {
                    try
                    {
                        throw new Exception("Exception message");
                    }
                    catch (Exception ex)
                    {
                        mainCancellationTokenSource.Cancel();
                    }

                }, mainCancellationTokenSource.Token);

                Task task2 = new Task(() =>
                {
                    Thread.Sleep(TimeSpan.FromSeconds(3));
                    Console.WriteLine("Task is running");

                }, mainCancellationTokenSource.Token);

                task1.Start();
                task2.Start();

                Task.WaitAll(new[] { task1, task2}, 
                             6000, // 6 seconds
                             mainCancellationTokenSource.Token
                            );
            }
            catch (Exception ex)
            {   
                // If any exception thrown on any of the tasks, break out immediately instead of wait all the way to 60 seconds.
            }

0

并行类可以为您完成任务。您可以使用Parallel.For、ForEach或Invoke。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample_04_04_2014_01
{
    class Program
    {
        public static void Main(string[] args)
        {
            try
            {
            Parallel.For(0,20, i => {
                            Console.WriteLine(i);
                            if(i == 5)
                                throw new InvalidOperationException();
                            Thread.Sleep(100);
                         });
            }
            catch(AggregateException){}

            Console.Write("Press any key to continue . . . ");
            Console.ReadKey(true);
        }
    }
}

如果其中一个任务抛出异常,则除了那些已经开始执行的任务之外,不会执行任何其他任务。For、ForEach和Invoke在恢复控制到调用代码之前等待所有任务完成。如果使用ParallelLoopState.IsExceptional,甚至可以有更细粒度的控制。Parallel.Invoke更适合您的情况。

如果所有任务都不同,并且其中一些是I/O绑定的,比如HttpClient.GetStringAsync,这是否有帮助? - noseratio - open to work
Parallel.Invoke可以容纳不同的任务。调用者可以通过在ParallelOptions参数中设置取消令牌来取消整个操作,如果操作时间过长。 - mircea
1
更不用说Parallel.For不像Task.WaitAll那样容易支持timeout了。关键是你必须在Parallel.For的lambda内部执行类似于阻塞式的httpClient.GetStringAsync().Wait(token)调用。这会破坏并行I/O绑定任务的想法。Parallel.XXX只适用于CPU绑定、同质化的工作项。 - noseratio - open to work
我同意对于httpClient.GetStringAsync和其他返回Tasks的API来说,使用Parallel方法是很麻烦的,因为这些方法适合并行执行多个操作。因此,等待多个并行执行的任务完成的最佳方式取决于具体情况。 - mircea

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接