异步任务。带超时的WhenAll。

71

在新的异步dotnet 4.5库中,是否有一种方法可以设置Task.WhenAll方法的超时时间?我想获取几个来源,在5秒后停止,并跳过未完成的来源。

12个回答

122
您可以使用 Task.WhenAny() 将生成的 TaskTask.Delay() 结合起来:
await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));

如果您想在超时的情况下收集已完成的任务:

var completedResults =
  tasks
  .Where(t => t.Status == TaskStatus.RanToCompletion)
  .Select(t => t.Result)
  .ToList();

这个回答得到了最多的赞,但我们知道这是否是现在实现这个目标的有效方法吗? - TheJediCowboy
9
@CitadelCSAlum你的意思是什么?这段代码实现了被要求的功能。如果你不相信我,你可以阅读文档或者亲自尝试一下。 - svick
虽然这是被接受的答案,但它是否完全符合问题描述?如果我理解正确,如果超时发生在所有任务完成之前,那么就不会收到任何结果(即使有些任务已经完成)。我理解的对吗?我正在寻找一些可以从多个任务中提取结果的东西 - 只取那些在超时时间内完成的任务,而不管其余的任务是否能够完成。请参见下面的我的回答。 - Erez Cohen
1
@ErezCohen 你说得对。我想我主要回答了问题的标题而不是正文(特别是“跳过未完成的来源”部分)。 - svick
1
@James South 这两个片段可以通过在第一个之后直接调用第二个来合并。首先等待,然后收集已完成任务的结果。所有任务都可能完成,也可能只有一些或没有完成。 - Theodor Zoulias
显示剩余4条评论

27
我认为一种更清晰、更强大的选项,同时也正确处理异常,是使用Task.WhenAny在每个任务上以及一个超时任务,遍历所有已完成的任务并过滤超时任务,并使用await Task.WhenAll()而不是Task.Result来收集所有结果。
下面是一个完整的工作解决方案:
static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
{
    var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
    var completedTasks = 
        (await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
        Where(task => task != timeoutTask);
    return await Task.WhenAll(completedTasks);
}

有两个 WhenAll,是否存在性能问题?第二个 WhenAll 是为了拆箱一个 Task< >?你能解释一下吗? - Menelaos Vergis
1
@MenelaosVergis 第一个 Task.WhenAll 是在返回已完成任务的任务上执行的(即 Task.WhenAny 的结果)。然后我使用 where 子句过滤这些任务。最后,我在这些任务上使用 Task.WhenAll 提取它们的实际结果。此时,所有这些任务都应该已经完成。 - i3arnon
我建议将ContinueWith配置为TaskScheduler.Default作为参数,以避免在任何奇怪的环境中运行连续任务,例如UITaskSchedulerLowPriorityTaskScheduler - Theodor Zoulias
相反,当所有的 continuation 只是进行一个非常短的调用或者只是返回一个值时,Current 调度器是最好的选择。没有理由去承担将调用封送到另一个线程的成本。如果线程已经处于活动状态,那么它是 UI TaskScheduler 还是低优先级调度器都无关紧要。如果需要的话,可以使用 TaskContinuationOptions.ExecuteSynchronously 来确保使用同一线程以避免重新调度。 - Panagiotis Kanavos
@TheodorZoulias,我认为你没有意识到你正在告诉一个创建性能指南的微软性能工程师他的代码是错误的。我认为你误解了这些指南,并在不理解何时以及为什么适用的情况下将其推向了极端。使用哪个线程返回常量值有什么关系呢?至于你的解决方案,你考虑过需要解包的这么多包装任务的复杂性和成本吗? - Panagiotis Kanavos

9
请查看微软的使用基于任务的异步模式中的“早期退出”和“Task.Delay”部分。
早期退出。通过一个WhenAny将由t1表示的操作与另一个任务t2分组,并且我们可以等待WhenAny任务。t2可能表示超时、取消或一些其他信号,这些信号将导致WhenAny任务在t1完成之前完成。

你想添加一个总结来说明它的内容吗? - svick
1
不确定您为什么回到这篇文章,但是您的代码示例正是论文所描述的(我假设您非常清楚)。根据您的要求,我已经更新了我的答案,并引用了原文。 - David Peden
@DavidPeden 这个链接已经失效了,谷歌搜索出现了这篇文章,不确定是否是你所指的那一个。https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap - DynaWeb
谢谢。我已经更新了链接,它是您提供的同一根文档下的第三篇文章。 - David Peden

2

您描述的似乎是一个非常常见的需求,但我无法找到任何相关示例。我进行了大量搜索……最终创建了以下内容:

TimeSpan timeout = TimeSpan.FromSeconds(5.0);

Task<Task>[] tasksOfTasks =
{
    Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
    Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
    Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
};

Task[] completedTasks = await Task.WhenAll(tasksOfTasks);

List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();

我假设这里有一个名为SomeTaskAsync的方法,它返回Task<MyResult>。
在completedTasks成员中,只有类型为MyResult的任务是我们自己的任务,超时后才能完成。Task.Delay返回不同类型的任务。 这需要在类型上做出一些妥协,但仍然非常简单且有效。
(数组当然可以使用查询+ToArray动态构建)。
请注意,此实现不需要SomeTaskAsync接收取消标记。

这看起来像是应该封装成一个辅助方法的东西。 - svick
@ErezCohen,我已经把我的答案变得更简单了,如果你想看一下:https://dev59.com/Bmkw5IYBdhLWcg3wg6yg#25733275 - i3arnon
1
@I3arnon - 很好!我喜欢它。 - Erez Cohen

2
除了超时之外,我还会检查取消操作。如果您正在构建Web应用程序,这将非常有用。
public static async Task WhenAll(
    IEnumerable<Task> tasks, 
    int millisecondsTimeOut,
    CancellationToken cancellationToken)
{
    using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
    using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
    {
        Task completedTask = await Task.WhenAny(
            Task.WhenAll(tasks), 
            timeoutTask, 
            cancellationMonitorTask
        );

        if (completedTask == timeoutTask)
        {
            throw new TimeoutException();
        }
        if (completedTask == cancellationMonitorTask)
        {
            throw new OperationCanceledException();
        }
        await completedTask;
    }
}

2
这段代码存在问题:如果任何常规任务完成,超时任务将不会被等待...因此,您的代码将一直运行,直到超时任务被处理掉,如果在超时任务运行结束之前就被处理掉了,那么您将会得到一个InvalidStateOperation。让任务保持未完成状态,您就没问题了。 - Stephan Steiner
我需要销毁任务(Tasks)吗? - Theodor Zoulias
@TheodorZoulias 你需要处理这些任务。如果超时或取消操作被触发,由调用者决定如何处理正在运行的任务。 - Tony
链接文章中,Tony,Stephen Toub说:“不需要。不必费心去处理你的任务。”而你却说:“你需要处理这些任务。”我感到困惑了。我应该听谁的建议? - Theodor Zoulias
@TheodorZoulias 很抱歉造成混淆。我的意思是由调用方决定是否应继续运行剩余的IEnumerable<Task> tasks,或在取消或超时发生时取消/停止(处理)它们。我使用“处理”这个术语并不严谨。您不必调用 Task.Dispose - Tony

1
void result 版本的 @i3arnon 的答案,包括注释和更改第一个参数使用扩展方法。
我还编写了一个转发方法,使用 TimeSpan.FromMilliseconds(millisecondsTimeout) 指定超时时间为整数,以匹配其他任务方法。
public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
{
  // Create a timeout task.
  var timeoutTask = Task.Delay(timeout);

  // Get the completed tasks made up of...
  var completedTasks =
  (
    // ...all tasks specified
    await Task.WhenAll(tasks

    // Now finish when its task has finished or the timeout task finishes
    .Select(task => Task.WhenAny(task, timeoutTask)))
  )
  // ...but not the timeout task
  .Where(task => task != timeoutTask);

  // And wait for the internal WhenAll to complete.
  await Task.WhenAll(completedTasks);
}

1

请查看http://tutorials.csharp-online.net/Task_Combinators中提出的自定义任务组合器。

async static Task<TResult> WithTimeout<TResult> 
   (this Task<TResult> task, TimeSpan timeout)
 {
   Task winner = await (Task.WhenAny 
      (task, Task.Delay (timeout)));
   if (winner != task) throw new TimeoutException();
   return await task; // Unwrap result/re-throw
}

我还没有尝试过它。

a) 链接已损坏。 b) 这适用于单个任务,这不是原帖所询问的内容。 - i3arnon

0

看起来,Task.WaitAll重载的超时参数就是你所需要的 - 如果它返回true,则表示所有任务都已完成 - 否则,你可以过滤IsCompleted。

if (Task.WaitAll(tasks, myTimeout) == false)
{
    tasks = tasks.Where(t => t.IsCompleted);
}
...

我认为这些任务都在它们自己的线程中启动,而新的异步函数不是,但如果我错了请纠正我。我刚开始学习这个新的异步编程内容。 - broersa
4
Task.WaitAll() 是阻塞的,所以如果可以避免的话,在 C# 5 中使用它不是一个好主意。 - svick
@broersa 首先,我认为你理解错了,线程与Taskasync方法之间的关系并不是那么简单。其次,这有什么关系呢? - svick
@svick 我想要的词是“阻塞”。现在情况变得更加清晰了。 - broersa

0
我找到了下面这段代码,它实现了我需要的功能:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Net.Http;
using System.Json;
using System.Threading;

namespace MyAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            Console.WriteLine("Start Main");
            List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
            listoftasks.Add(GetGoogle(cts));
            listoftasks.Add(GetTwitter(cts));
            listoftasks.Add(GetSleep(cts));
            listoftasks.Add(GetxSleep(cts));

            List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
            List<MyObject> answer = new List<MyObject>();
            foreach (List<MyObject> answers in arrayofanswers)
            {
                answer.AddRange(answers);
            }
            foreach (MyObject o in answer)
            {
                Console.WriteLine("{0} - {1}", o.name, o.origin);
            }
            Console.WriteLine("Press <Enter>");
            Console.ReadLine();
        } 

        static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts) 
        {
            try
            {
                Console.WriteLine("Start GetGoogle");
                List<MyObject> l = new List<MyObject>();
                var client = new HttpClient();
                Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
                HttpResponseMessage res = await awaitable;
                Console.WriteLine("After GetGoogle GetAsync");
                dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                Console.WriteLine("After GetGoogle ReadAsStringAsync");
                foreach (var r in data.responseData.results)
                {
                    l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
                }
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }
        }

        static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
        {
            try
            {
                Console.WriteLine("Start GetTwitter");
                List<MyObject> l = new List<MyObject>();
                var client = new HttpClient();
                Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
                HttpResponseMessage res = await awaitable;
                Console.WriteLine("After GetTwitter GetAsync");
                dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                Console.WriteLine("After GetTwitter ReadAsStringAsync");
                foreach (var r in data.results)
                {
                    l.Add(new MyObject() { name = r.text, origin = "twitter" });
                }
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }
        }

        static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
        {
            try
            {
                Console.WriteLine("Start GetSleep");
                List<MyObject> l = new List<MyObject>();
                await Task.Delay(5000,cts.Token);
                l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }

        } 

        static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
        {
            Console.WriteLine("Start GetxSleep");
            List<MyObject> l = new List<MyObject>();
            await Task.Delay(2000);
            cts.Cancel();
            l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
            return l;
        } 

    }
}

我的解释在我的博客文章中: http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html


0

我试图改进优秀的 i3arnon解决方案,以修复一些小问题,但最终得到了完全不同的实现。我尝试解决的两个问题是:

  1. 如果有多个任务失败,传播所有失败任务的错误,而不仅仅是列表中第一个失败任务的错误。
  2. 防止内存泄漏,以防所有任务完成的时间比timeout快得多。 如果在循环中调用WhenAll并且timeout很大,则泄漏活动的Task.Delay可能导致泄漏的内存数量非常可观。

除此之外,我还添加了cancellationToken参数,XML文档说明了该方法的功能,并进行了参数验证。这就是代码:

/// <summary>
/// Returns a task that will complete when all of the tasks have completed,
/// or when the timeout has elapsed, or when the token is canceled, whatever
/// comes first. In case the tasks complete first, the task contains the
/// results/exceptions of all the tasks. In case the timeout elapsed first,
/// the task contains the results/exceptions of the completed tasks only.
/// In case the token is canceled first, the task is canceled. To determine
/// whether a timeout has occured, compare the number of the results with
/// the number of the tasks.
/// </summary>
public static Task<TResult[]> WhenAll<TResult>(
    Task<TResult>[] tasks,
    TimeSpan timeout, CancellationToken cancellationToken = default)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    tasks = tasks.ToArray(); // Defensive copy

    if (tasks.Any(t => t == null)) throw new ArgumentException(
        $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
    if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
        throw new ArgumentOutOfRangeException(nameof(timeout));

    if (cancellationToken.IsCancellationRequested)
        return Task.FromCanceled<TResult[]>(cancellationToken);

    var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(timeout);

    var continuationOptions = TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously;

    var continuations = tasks.Select(task => task.ContinueWith(_ => { },
        cts.Token, continuationOptions, TaskScheduler.Default));

    return Task.WhenAll(continuations).ContinueWith(allContinuations =>
    {
        cts.Dispose();
        if (allContinuations.IsCompletedSuccessfully)
            return Task.WhenAll(tasks); // No timeout or cancellation occurred

        Debug.Assert(allContinuations.IsCanceled);

        if (cancellationToken.IsCancellationRequested)
            return Task.FromCanceled<TResult[]>(cancellationToken);

        // Now we know that timeout has occurred
        return Task.WhenAll(tasks.Where(task => task.IsCompleted));
    }, default, continuationOptions, TaskScheduler.Default).Unwrap();
}

这个WhenAll实现省略了async和await, 一般情况下不建议这样做。但在这种情况下,为了传播所有错误到一个非嵌套的AggregateException中,这是必要的。目的是尽可能准确地模拟内置的Task.WhenAll方法的行为。

使用示例:

string[] results;
Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
try
{
    results = await whenAllTask;
}
catch when (whenAllTask.IsFaulted) // It might also be canceled
{
    // Log all errors
    foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
    {
        _logger.LogError(innerEx, innerEx.Message);
    }
    throw; // Propagate the error of the first failed task
}
if (results.Length < tasks.Length) throw new TimeoutException();
return results;

注意:上述 API 存在设计缺陷。如果至少有一个任务失败或取消,就无法确定是否发生了超时。由 WhenAll 返回的任务的Exception.InnerExceptions 属性可能包含所有任务或部分任务的异常,而无法区分哪个是哪个。不幸的是,我想不出解决这个问题的办法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接