并行.ForEach中嵌套等待(await)

228

在一个 Metro 应用中,我需要执行多个 WCF 调用。由于需要进行大量的调用,因此我需要在并行循环中执行它们。但是问题在于,在 WCF 调用完成之前并行循环就已经退出了。

你会如何重构代码以满足预期功能?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

2
我已经将这个问题标记为使用异步lambda的并行foreach的重复问题,尽管那个问题比这个问题新几个月,因为另一个问题包含一个已经得到大量赞同的答案,该答案推荐了当前解决此问题最佳的方法,即新的Parallel.ForEachAsync API。 - Theodor Zoulias
11个回答

198
Parallel.ForEach() 的整个思想是,有一组线程,每个线程处理集合的一部分。正如您注意到的,这在使用 async-await 时无法完成,因为您希望在异步调用的持续时间内释放线程。你可以通过阻塞 ForEach() 线程来“修复”它,但这违反了 async-await 的整个目的。 你可以使用 TPL Dataflow 替代 Parallel.ForEach(),它很好地支持异步 Task。具体来说,你可以使用一个 TransformBlock 来将每个 id 转换成一个 Customer,使用 async lambda。该块可以配置为并行执行。然后将该块链接到一个 ActionBlock,以将每个 Customer 写入控制台。在设置好块网络之后,你可以将每个 id 提交到 TransformBlock 中,使用 Post() 方法。代码如下:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

尽管您可能想将TransformBlock的并行度限制为一些小的常量,但也可以通过使用SendAsync()异步地向其中添加项目来限制TransformBlock的容量,例如如果集合太大。

与您的代码相比(如果它能正常工作),另一个好处是,在单个项完成时写入将立即开始,而不是等待所有处理完成。


2
对于像我这样需要一些澄清的人,以下是有关async、reactive extensions、TPL和TPL DataFlow的简要概述 - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow/。 - Norman H
1
我非常确定这个答案没有并行处理。我认为你需要在ids上使用Parallel.ForEach,并将其发送到getCustomerBlock。至少当我测试这个建议时,发现是这样的。 - JasonLind
4
确实如此。使用 Parallel.ForEach() 实现并行发布项目不应该有任何实际影响。 - svick
1
@svick 好的,我找到了。ActionBlock也需要在Parallel中。我做法稍有不同,因为我不需要转换,所以我只是使用了BufferBlock,并在ActionBlock中完成了我的工作。我被网上的另一个答案搞混了。 - JasonLind
2
我的意思是在ActionBlock上指定MaxDegreeOfParallelism,就像你在示例中对TransformBlock所做的那样。 - JasonLind
显示剩余2条评论

149

svick的回答(像往常一样)非常出色。

但是,当您确实有大量数据需要传输时,我发现Dataflow更有用。或者当您需要一个 async 兼容队列时。

在您的情况下,一个更简单的解决方案是只使用 async 风格的并行处理:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();

19
如果你想手动限制并行性(在这种情况下,你很可能会这样做),那么用这种方式会更加复杂。 - svick
2
@JamesManning ParallelOptions 怎么会有帮助?它只适用于 Parallel.For/ForEach/Invoke,而正如 OP 所述,在这里并没有用处。 - Ohad Schneider
9
Parallel.ForEach 不支持 async - Stephen Cleary
4
@MikeT:那样不会按预期工作。PLINQ 不理解异步任务,因此该代码仅会并行化“异步”lambda的启动部分。 - Stephen Cleary
3
@Mike: Parallel(和Task<T>)是在async/await出现之前几年编写的,作为任务并行库(TPL)的一部分。当async/await出现时,它们可以选择为与async一起使用而创建自己的Future<T>类型,或者重用TPL中现有的Task<T>类型。两个决策都没有明显正确的答案,因此他们决定重用Task<T> - Stephen Cleary
显示剩余12条评论

98
使用DataFlow可能有些过度,正如svick所建议的那样,而Stephen的答案并没有提供控制操作并发性的手段。然而,这可以相当简单地实现:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}
< p > 可以通过使用数组而不是列表并替换已完成的任务来优化 < code > ToArray() 调用,但我怀疑在大多数情况下这不会有太大的差异。 根据 OP 的问题示例用法:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

编辑 Stack Overflow 用户和TPL高手Eli Arbel向我指出了Stephen Toub的一篇相关文章。像往常一样,他的实现既优雅又高效:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}

1
@RichardPierre 实际上,Partitioner.Create 的这种重载使用了块分区,可以动态地将元素提供给不同的任务,因此您描述的情况不会发生。另外请注意,静态(预定)分区在某些情况下可能更快,因为它具有较少的开销(特别是同步)。有关更多信息,请参见:https://msdn.microsoft.com/zh-cn/library/dd997411(v=vs.110).aspx。 - Ohad Schneider
1
@OhadSchneider 在//观察异常时,如果它抛出异常,它会冒泡到调用者吗?例如,如果我希望整个可枚举对象在任何部分失败时停止处理/失败? - Terry
3
如果顶层任务(由Task.WhenAll创建)出现异常,它将通过抛出一个 AggregateException 的方式传递给调用者。如果调用者使用了 await ,异常会在调用现场抛出。然而,Task.WhenAll 仍然会等待所有任务完成,而 GetPartitions 在调用 partition.MoveNext 时动态分配元素,直到没有更多元素需要处理为止。这意味着,除非您添加自己的机制来停止处理(例如CancellationToken),否则不会自行停止处理。 - Ohad Schneider
1
@gibbocool 我还不确定我是否理解了。假设您有7个任务,并且使用您在评论中指定的参数。进一步假设第一批次需要偶尔执行5秒钟的任务和三个1秒钟的任务。大约经过一秒钟后,5秒钟的任务仍将在执行,而三个1秒钟的任务将已完成。此时,剩下的三个1秒钟的任务将开始执行(它们将由分区器提供给三个“空闲”线程)。 - Ohad Schneider
2
@MichaelFreidgeim 你可以在 await body 之前做类似于 var current = partition.Current 的操作,然后在继续执行 (ContinueWith(t => { ... }) 中使用 current - Ohad Schneider
显示剩余16条评论

55
你可以使用新的AsyncEnumerator NuGet Package来节省工作量,这个包在4年前提问时还不存在。它可以让你控制并行度:
using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

声明:我是AsyncEnumerator库的作者,该库是开源的,并且在MIT许可下发布。我发布这条消息只是为了帮助社区。


你的库与.NET Core不兼容。 - Corniel Nobel
2
@CornielNobel,它与.NET Core兼容 - GitHub上的源代码对.NET Framework和.NET Core都进行了测试覆盖。 - Serge Semenov
我敢打赌那应该是 AsyncEnumerable 而不是 AsyncEnumerator :) - Serge Semenov
2
@SergeSemenov 我经常使用你的库中的 AsyncStreams,我必须说它非常出色。强烈推荐这个库。 - WBuck
如果任何一个方法出现错误,循环会终止吗?它会抛出聚合异常还是只有第一个异常? - Vikram Singh Saini
显示剩余4条评论

18

Parallel.Foreach包装在Task.Run()中,而不是使用await关键字,请使用[yourasyncmethod].Result

(你需要这样做Task.Run的事情,以免阻塞UI线程)

像这样:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;

4
有什么问题吗?我会像这样做。使用 Parallel.ForEach 来并行处理,等待所有操作完成,然后将整个操作推送到后台线程以实现响应式用户界面。这样有什么问题吗?也许会多出一个休眠线程,但这是短小易读的代码。 - ygoe
@LonelyPixel 我唯一的问题是它在TaskCompletionSource更可取时调用了Task.Run - Gusdor
1
@Gusdor 好奇 - 为什么TaskCompletionSource更可取? - Seafish
1
只是一个简短的更新。我现在正在寻找这个,向下滚动以找到最简单的解决方案,并再次找到了自己的评论。我使用了完全相同的代码,它按预期工作。它仅假定循环内有原始异步调用的同步版本。await可以移到前面以节省额外的变量名。 - ygoe
1
我不确定你的情况是什么,但我相信你可以删除Task.Run()。只需在末尾添加.Result或.Wait就足以使并行执行等待所有线程完成。 - Eduard G
显示剩余2条评论

8

这应该是相当高效的,比使用整个TPL数据流更容易:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}

使用示例不应该像这样使用await吗:var customers = await ids.SelectAsync(async i => { ... }); - Paccc

8

这是一个扩展方法,它利用了SemaphoreSlim,并且允许设置最大并行度:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();

            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();

                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }

            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

使用示例:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

7

我来晚了,但你可能想考虑使用GetAwaiter.GetResult()在同步上下文中运行异步代码,但以下是并行的。

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

5
介绍了一些帮助方法后,您将能够使用以下简单语法运行并行查询:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

这里发生的事情是:我们将源集合分成10个块(.Split(DegreeOfParallelism)),然后运行10个任务,每个任务逐一处理其项目(.SelectManyAsync(...)),并将它们合并回单个列表中。
值得一提的是,还有一种更简单的方法:
double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

但需要注意:如果您的源集合太大,则会立即为每个项安排一个任务,这可能会导致显着的性能损失。

上面示例中使用的扩展方法如下:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}

2
异步操作并行化的问题已经在.NET 6中引入Parallel.ForEachAsync API得到解决,但是使用旧版.NET平台的人们可能仍然需要一个合适的替代品。实现一种简单的方法是使用TPL Dataflow库中的ActionBlock<T>组件。该库包含在标准.NET库(.NET Core和.NET 5+)中,并且可用作.NET Framework的NuGet包。以下是如何使用它:
public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

此解决方案仅适用于实例化的“source”序列,因此参数类型为“ICollection<T>”,而不是更常见的“IEnumerable<T>”。它还具有忽略任何由“action”抛出的“OperationCanceledException”的令人惊讶的行为。解决这些细微差别并尝试精确复制“Parallel.ForEachAsync”的功能和行为是可行的,但需要几乎与使用更原始的工具一样多的代码。我在答案的第9个修订版中发布了这样的尝试。
以下是另一种实现Parallel.ForEachAsync方法的尝试,提供与.NET 6 API完全相同的功能,并尽可能模拟其行为。它仅使用基本的TPL工具。其思想是创建与所需的MaxDegreeOfParallelism相等的工作任务数量,每个任务以同步方式枚举相同的枚举器。这类似于Parallel.ForEachAsync内部实现的方式。不同之处在于,.NET 6 API从单个工作线程开始逐渐添加更多线程,而下面的实现从一开始就创建所有工作线程:
public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;

    IEnumerator<T> enumerator = source.GetEnumerator();
    CancellationTokenSource cts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (true)
                {
                    if (cts.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        break;
                    }
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

签名不同。body参数的类型为Func,而不是Func。这是因为value-task是一个相对较新的特性,在.NET Framework中不可用。
行为上也有所不同。该实现对body抛出的OperationCanceledException做出反应,以取消完成。正确的行为应该是将这些异常作为单独的错误传播,并完成为故障。修复这个小缺陷是可行的,但我更喜欢不进一步复杂化这个相对简短和可读的实现。

这个实现与您在此处分享的其他ForEachAsync()实现相比如何? - alhazen
@alhazen 我已经放弃了那个过于惯用的实现。这个答案中的Parallel_ForEachAsync实现复制了本地的Parallel.ForEachAsync的特性和行为,它可以并行调用body。我的另一个实现是顺序调用,每次只处理一个项目。这是它们的主要行为差异,并行性与异步并发 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接