使用并行foreach和异步lambda表达式

293
我想要并行处理一个集合,但是在实现过程中遇到了困难,因此希望能得到一些帮助。
问题出现在我想要在并行循环的 lambda 表达式中调用一个在 C# 中标记为 async 的方法时。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

问题出现在计数为0的情况下,因为所有创建的线程实际上只是后台线程,并且Parallel.ForEach调用不会等待完成。如果我移除async关键字,该方法如下所示:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

它可以工作,但它完全禁用了await的巧妙之处,我必须进行一些手动的异常处理..(为了简洁起见已删除)。

我如何在lambda中使用await关键字来实现Parallel.ForEach循环?这可能吗?

Parallel.ForEach方法的原型接受一个Action<T>作为参数,但我希望它等待我的异步lambda。


3
我猜你的意思是在第二个代码块中从await GetData(item)中删除await,因为它会产生编译错误。 - Josh M.
3
可能是Nesting await in Parallel.ForEach的重复问题。 - Vitaliy Ulantikov
1
顺便提一下,ConcurrentBag<T>是一个非常专业化的集合类型。在这种情况下,使用ConcurrentQueue<T>会更好。 - Theodor Zoulias
10个回答

330
如果您只需要简单的并行处理,可以这样做:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

如果您需要更复杂的内容,请查看Stephen Toub的ForEachAsync文章

109
可能需要一个限制机制。这将立即创建与可能会生成10,000个网络请求等数量的项相同的任务。 - usr
14
史蒂芬·托布在他的文章中提到了最后一个例子来解决这个问题。 - svick
2
它创建了“dop”任务,然后每个任务按顺序处理输入集合的某些子集。 - svick
4
如果你调用Task.Run但没有等待结果,那就是将一项“启动并忘记”的工作投入到线程池中,这几乎总是一个错误。 - Stephen Cleary
4
对于这种方法,一个简单的节流机制是将列表分成包含 N 个条目的小列表,并为每个较小的批次执行此任务选择 + Task.WhenAll。这样,您就不会为大数据集产生数千个任务。 - Bjorn De Rijcke
显示剩余8条评论

152
你可以使用 AsyncEnumerator NuGet Package 中的 ParallelForEachAsync 扩展方法:
using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

声明:我是AsyncEnumerator库的作者,该库是开源的,并在MIT许可下发布。我发布这条消息只是为了帮助社区。


3
这是你的包裹吗?我现在已经在几个地方看到你发布了它? :D 哦等一下...你的名字在包裹上 :D +1 - Piotr Kula
39
@ppumkin,是的,这是我的。我一遍又一遍地看到这个问题,所以决定用最简单的方式来解决它,并让其他人也从中获益,不再挣扎 :) - Serge Semenov
2
你打错字了:maxDegreeOfParallelism 应该是 maxDegreeOfParalellism - Shiran Dror
3
正确的拼写确实是maxDegreeOfParallelism,然而在@ShiranDror的评论中有一些内容——在你的程序包中,你错误地将变量称为maxDegreeOfParalellism(因此除非更正,否则你引用的代码将无法编译..)。 - BornToCode
1
@SergeSemenov,如果是这样的话,我认为您可能需要更新此答案中的链接,因为它指向V1.10。既然您在这个问题上很活跃,我就把这个任务交给您了。 - StuartQ
显示剩余6条评论

137

新的.NET 6 API之一是Parallel.ForEachAsync,它是一种调度异步工作的方式,允许您控制并行度:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

另一个例子在Scott Hanselman的博客中

作为参考的来源


有关处理异常,请参见 https://dev59.com/k1kS5IYBdhLWcg3wQEcU 和微软文档 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-handle-exceptions-in-parallel-loops。 - Michael Freidgeim

37

使用SemaphoreSlim,您可以实现并行控制。

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

3
SemaphoreSlim应该使用using语句进行包装,因为它实现了IDisposable接口。 - Sal

16

这是一个尽可能简单的扩展方法,编译自其他答案和被接受的答案所引用的文章:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

更新:这里有一个简单的修改,也支持像评论中要求的取消令牌(未经测试)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

2
@TheodorZoulias 两个观点都很好,已进行编辑。我还在开发一个支持“cancellationToken”的变体,稍后会发布它。 - Alex from Jitbit
1
值得注意的是,大多数基于Task.WhenAll的解决方案仅适用于相对较少的任务或保证不会抛出异常的asyncAction。否则,等待完成10,000个任务半小时,最终只收到一个异常结果(可能是由第一个任务引起的)会非常令人沮丧。 - Theodor Zoulias
1
这不是一个健壮的解决方案,原因有两个。首先,如果抛出异常,它将无法终止循环。其次,“throttler”未被处理。 - zmechanic
@zmechanic 我认为是否在异常时中止循环取决于开发人员。 - Alex from Jitbit
@Alex来自Jitbit的同意,但这并没有在你的回答中说明,并且LINQ的行为(就像你所做的那样)在这方面与foreach不同。在LINQ中,异常不会终止枚举。 - zmechanic
显示剩余5条评论

5

我轻量级的实现了ParallelForEach异步功能。

特性:

  1. 限流(最大并发度)。
  2. 异常处理(聚合异常将在完成时抛出)。
  3. 内存高效(无需存储任务列表)。

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

使用示例:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

@Hocas,你为什么认为需要TrySetResult? - nicolas2008
使用SemaphoreSlimCurrentCount属性来控制执行流程并不是一个好主意。在大多数情况下,它会创建竞争条件。使用Volatile.Read也是不稳定的(另一个可能的竞争条件)。我不会在生产环境中信任这个解决方案。 - Theodor Zoulias
@Theodor Zoulias,感谢您的反馈。如果您能提供证据或至少链接到官方文档来证明您的疑虑,那将更有建设性。 - nicolas2008
Nicolay,对于生产代码,我首选的解决方案是使用 TPL Dataflow 库中的 ActionBlock<T>。易用、高效、非常稳定,提供了许多配置选项,并且在 .NET Core 中本地可用,还能要求什么呢?如果由于某些原因无法使用此选项,则可以在此处找到一些可靠的实现(包括我的两个实现)。 - Theodor Zoulias
@Theodor Zoulias 我使用了这个解决方案,并且在一个具有大数据的生产系统中运行良好。该系统于2021年发布。但是我进行了一些修复... - Hocas
显示剩余8条评论

1

我已经创建了一个扩展方法,它利用SemaphoreSlim并允许设置最大并行度。

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

使用示例:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

foreach 循环将无限期等待信号量。请尝试运行以下简单代码以复现此问题:await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("测试异常"); }, maxDegreeOfParallelism: 2); - nicolas2008
@nicolay.anykienko,你说的第二点是对的。通过添加tasksWithThrottler.RemoveAll(x => x.IsCompleted)可以解决内存问题。 - askids
1
我已经在我的代码中尝试过了,如果maxDegreeOfParallelism不为空,代码会死锁。在这里,您可以看到所有的代码以进行复制:https://stackoverflow.com/questions/58793118/uwp-perform-long-view-model-creation-and-keep-ui-responsive/58824347#58824347 - Massimo Savazzi
当我考虑将其实现到我的项目中时,我对这种方法有所顾虑,因为我要处理的 170 万行数据会导致每个任务都在 tasksWithThrottler 列表中拥有一个工作项,这似乎并不理想或可扩展。于是我和我的团队合作提出了使用 ActionBlock 的另一种解决方案。 - Caleb Holt
请添加取消令牌代码... 也请。 - Seabizkit
@nicolas2008,你的意思是它会无限期地等待,如果是这样的话,那么for循环就永远不会存在了,因为只有在有更多要迭代的情况下才会进入循环。所以它肯定会退出,这意味着使用'using'将有所帮助,因为它会退出方法,对吧? - Seabizkit

1
在被接受的答案中,不需要使用ConcurrentBag。 以下是一种没有使用它的实现方式:
var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

任何 "// some pre stuff" 和 "// some post stuff" 都可以放入 GetData 实现中(或者调用 GetData 的其他方法中)。
除了更短之外,没有使用 "async void" lambda,这是一个反模式。

0
以下代码是针对 IAsyncEnumerable 进行设置的,但只需更改类型并在 foreach 上删除 "await" 即可修改为使用 IEnumerable。与创建无数个并行任务然后等待它们所有完成相比,这种方式更适用于大量数据集。
    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }

@TheodorZoulias 非常感谢您的反馈!这是我正在为一个项目积极努力的事情,所以我会查看这些更改并更新我的解决方案。 - Caleb Holt
1
@TheodorZoulias的回答展示了非常相似的方法... 可能SendAsync不会等待操作完成(这在文档中并不清楚)。 - Alexei Levenkov
1
@AlexeiLevenkov SendAsync 方法的文档非常令人困惑。我怀疑这个星球上从未存在过一个聪明到可以仅通过阅读文档就理解此方法的人。应该深入源代码并了解 PostSendAsync 方法都基于隐藏(显式实现的)OfferMessage API,该 API 有 5 种可能的返回值。SendAsync 异步处理 Postponed 返回值。 - Theodor Zoulias
@TheodorZoulias 那么,尝试在循环中进行完整/完成调用,并将其放置在finally中,然后允许异常指示它未运行到完成?我考虑捕获所有异常并返回聚合结果,但根据原因,这可能会导致大量异常。我想我可以通过某些选项类深入研究,并让调用者决定是否应该聚合异常或在第一次出现时终止。这是通用方法最灵活的方式。(此时我还通过cancelationToken进行了连接。) - Caleb Holt
Caleb,理想情况下,你应该注意来自 GetAsyncEnumeratorMoveNextAsyncDisposeAsync 方法的异常,如果其中任何一个失败,则通过调用其 Fault 方法将异常传播到 ActionBlock。不过这需要做很多工作,所以你可以采取捷径,只需在 try/catch 中包装循环,冒着方法实现中的错误也可能被作为正常操作错误浮出水面的风险。这就是我在 这个 实现中所做的。 - Theodor Zoulias
显示剩余2条评论

-2

对于一个更简单的解决方案(不确定是否最优),您可以将Parallel.ForEach嵌套在Task中,如下所示

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

ParallelOptions 可以为您提供开箱即用的限流功能。

我正在一个真实场景中使用它来在后台运行非常长的操作。这些操作通过 HTTP 调用,并且设计成在长时间操作运行时不会阻塞 HTTP 调用。

  1. 调用 HTTP 进行长时间后台操作。
  2. 操作在后台开始运行。
  3. 用户获得状态 ID,可以使用另一个 HTTP 调用来检查状态。
  4. 后台操作更新其状态。

这样,CI/CD 调用不会因为长时间的 HTTP 操作而超时,而是每隔 x 秒循环一次状态,而不会阻塞进程。


2
重力 我很抱歉不得不对你的答案进行投票否决,但是将异步委托传递给Parallel.ForEach方法不仅仅是“不是最佳实践”,而是存在严重和无法恢复的缺陷。Parallel.ForEach不理解异步委托,因此lambda是async void。它不是“发射并忘记”,而是“发射并崩溃”。在这种情况下,Parallel.ForEach将不会等待启动操作的完成,也不会强制执行最大并行度,并且不会传播异常。任何异常都将无法处理并导致进程崩溃。 - Theodor Zoulias
1
重力这是一个不好的例子。并行化Console.WriteLine方法没有意义,因为该方法是同步的。一次只能有一个线程写入Console。还要注意Thread.Sleep(15000);的丑陋。您添加了此行,否则程序将在不受控制地启动async void操作的Parallel.ForEach循环完成之前结束。这不是编写软件的正确方式。 - Theodor Zoulias
2
你不能通过提供糟糕的示例并间接推广不良实践来期望获得好评,无论你是否推荐它们。为什么不从你的答案中删除所有不良内容,只保留好的部分呢? - Theodor Zoulias
2
在编程领域中,与Parallel.ForEach方法相关的“您也可以使用异步lambda表达式”这句话,对我来说是一个无可争议的反对意见。无论之前或之后有多少警告,或者像删除线一样的删除指示,都不能使这个短语的存在变得容忍。我只是在谈论我的投票标准。其他人可以按照自己的意愿投票。 - Theodor Zoulias
1
接受并理解了。我没有考虑到这一点,我同意您的标准,并相应地更改了我的帖子。 - Gravity API
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接