在新的异步dotnet 4.5库中,是否有一种方法可以设置Task.WhenAll
方法的超时时间?我想获取几个来源,在5秒后停止,并跳过未完成的来源。
Task.WhenAny()
将生成的 Task
与 Task.Delay()
结合起来:await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));
如果您想在超时的情况下收集已完成的任务:
var completedResults =
tasks
.Where(t => t.Status == TaskStatus.RanToCompletion)
.Select(t => t.Result)
.ToList();
Task.WhenAny
在每个任务上以及一个超时任务,遍历所有已完成的任务并过滤超时任务,并使用await Task.WhenAll()
而不是Task.Result
来收集所有结果。static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
var completedTasks =
(await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
Where(task => task != timeoutTask);
return await Task.WhenAll(completedTasks);
}
Task.WhenAll
是在返回已完成任务的任务上执行的(即 Task.WhenAny
的结果)。然后我使用 where 子句过滤这些任务。最后,我在这些任务上使用 Task.WhenAll
提取它们的实际结果。此时,所有这些任务都应该已经完成。 - i3arnonContinueWith
配置为TaskScheduler.Default
作为参数,以避免在任何奇怪的环境中运行连续任务,例如UITaskScheduler
或LowPriorityTaskScheduler
。 - Theodor ZouliasCurrent
调度器是最好的选择。没有理由去承担将调用封送到另一个线程的成本。如果线程已经处于活动状态,那么它是 UI TaskScheduler 还是低优先级调度器都无关紧要。如果需要的话,可以使用 TaskContinuationOptions.ExecuteSynchronously
来确保使用同一线程以避免重新调度。 - Panagiotis Kanavos您描述的似乎是一个非常常见的需求,但我无法找到任何相关示例。我进行了大量搜索……最终创建了以下内容:
TimeSpan timeout = TimeSpan.FromSeconds(5.0);
Task<Task>[] tasksOfTasks =
{
Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
};
Task[] completedTasks = await Task.WhenAll(tasksOfTasks);
List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();
public static async Task WhenAll(
IEnumerable<Task> tasks,
int millisecondsTimeOut,
CancellationToken cancellationToken)
{
using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
{
Task completedTask = await Task.WhenAny(
Task.WhenAll(tasks),
timeoutTask,
cancellationMonitorTask
);
if (completedTask == timeoutTask)
{
throw new TimeoutException();
}
if (completedTask == cancellationMonitorTask)
{
throw new OperationCanceledException();
}
await completedTask;
}
}
IEnumerable<Task> tasks
,或在取消或超时发生时取消/停止(处理)它们。我使用“处理”这个术语并不严谨。您不必调用 Task.Dispose
。 - TonyTimeSpan.FromMilliseconds(millisecondsTimeout)
指定超时时间为整数,以匹配其他任务方法。public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
{
// Create a timeout task.
var timeoutTask = Task.Delay(timeout);
// Get the completed tasks made up of...
var completedTasks =
(
// ...all tasks specified
await Task.WhenAll(tasks
// Now finish when its task has finished or the timeout task finishes
.Select(task => Task.WhenAny(task, timeoutTask)))
)
// ...but not the timeout task
.Where(task => task != timeoutTask);
// And wait for the internal WhenAll to complete.
await Task.WhenAll(completedTasks);
}
请查看http://tutorials.csharp-online.net/Task_Combinators中提出的自定义任务组合器。
async static Task<TResult> WithTimeout<TResult>
(this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny
(task, Task.Delay (timeout)));
if (winner != task) throw new TimeoutException();
return await task; // Unwrap result/re-throw
}
看起来,Task.WaitAll重载的超时参数就是你所需要的 - 如果它返回true,则表示所有任务都已完成 - 否则,你可以过滤IsCompleted。
if (Task.WaitAll(tasks, myTimeout) == false)
{
tasks = tasks.Where(t => t.IsCompleted);
}
...
Task.WaitAll()
是阻塞的,所以如果可以避免的话,在 C# 5 中使用它不是一个好主意。 - svickTask
或async
方法之间的关系并不是那么简单。其次,这有什么关系呢? - svickusing System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Net.Http;
using System.Json;
using System.Threading;
namespace MyAsync
{
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.WriteLine("Start Main");
List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
listoftasks.Add(GetGoogle(cts));
listoftasks.Add(GetTwitter(cts));
listoftasks.Add(GetSleep(cts));
listoftasks.Add(GetxSleep(cts));
List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
List<MyObject> answer = new List<MyObject>();
foreach (List<MyObject> answers in arrayofanswers)
{
answer.AddRange(answers);
}
foreach (MyObject o in answer)
{
Console.WriteLine("{0} - {1}", o.name, o.origin);
}
Console.WriteLine("Press <Enter>");
Console.ReadLine();
}
static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetGoogle");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetGoogle GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetGoogle ReadAsStringAsync");
foreach (var r in data.responseData.results)
{
l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetTwitter");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetTwitter GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetTwitter ReadAsStringAsync");
foreach (var r in data.results)
{
l.Add(new MyObject() { name = r.text, origin = "twitter" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(5000,cts.Token);
l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
{
Console.WriteLine("Start GetxSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(2000);
cts.Cancel();
l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
return l;
}
}
}
我的解释在我的博客文章中: http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html
我试图改进优秀的 i3arnon解决方案,以修复一些小问题,但最终得到了完全不同的实现。我尝试解决的两个问题是:
timeout
快得多。
如果在循环中调用WhenAll
并且timeout
很大,则泄漏活动的Task.Delay
可能导致泄漏的内存数量非常可观。除此之外,我还添加了cancellationToken
参数,XML文档说明了该方法的功能,并进行了参数验证。这就是代码:
/// <summary>
/// Returns a task that will complete when all of the tasks have completed,
/// or when the timeout has elapsed, or when the token is canceled, whatever
/// comes first. In case the tasks complete first, the task contains the
/// results/exceptions of all the tasks. In case the timeout elapsed first,
/// the task contains the results/exceptions of the completed tasks only.
/// In case the token is canceled first, the task is canceled. To determine
/// whether a timeout has occured, compare the number of the results with
/// the number of the tasks.
/// </summary>
public static Task<TResult[]> WhenAll<TResult>(
Task<TResult>[] tasks,
TimeSpan timeout, CancellationToken cancellationToken = default)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
tasks = tasks.ToArray(); // Defensive copy
if (tasks.Any(t => t == null)) throw new ArgumentException(
$"The {nameof(tasks)} argument included a null value.", nameof(tasks));
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeout));
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TResult[]>(cancellationToken);
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
var continuationOptions = TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously;
var continuations = tasks.Select(task => task.ContinueWith(_ => { },
cts.Token, continuationOptions, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(allContinuations =>
{
cts.Dispose();
if (allContinuations.IsCompletedSuccessfully)
return Task.WhenAll(tasks); // No timeout or cancellation occurred
Debug.Assert(allContinuations.IsCanceled);
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TResult[]>(cancellationToken);
// Now we know that timeout has occurred
return Task.WhenAll(tasks.Where(task => task.IsCompleted));
}, default, continuationOptions, TaskScheduler.Default).Unwrap();
}
这个WhenAll
实现省略了async和await, 一般情况下不建议这样做。但在这种情况下,为了传播所有错误到一个非嵌套的AggregateException
中,这是必要的。目的是尽可能准确地模拟内置的Task.WhenAll
方法的行为。
使用示例:
string[] results;
Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
try
{
results = await whenAllTask;
}
catch when (whenAllTask.IsFaulted) // It might also be canceled
{
// Log all errors
foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
{
_logger.LogError(innerEx, innerEx.Message);
}
throw; // Propagate the error of the first failed task
}
if (results.Length < tasks.Length) throw new TimeoutException();
return results;
注意:上述 API 存在设计缺陷。如果至少有一个任务失败或取消,就无法确定是否发生了超时。由 WhenAll
返回的任务的Exception.InnerExceptions
属性可能包含所有任务或部分任务的异常,而无法区分哪个是哪个。不幸的是,我想不出解决这个问题的办法。