在C#中,Parallel.ForEach未按预期向ConcurrentBag添加项目

3
在我的Asp.Net Core WebApi Controller中,我收到了一个IFormFile[] files。我需要将其转换为List<DocumentData>。我最初使用了foreach,它运行良好。但后来决定改用Parallel.ForEach,因为我会接收很多(>5)文件。 这是我的DocumentData类:
public class DocumentData
{
    public byte[] BinaryData { get; set; }
    public string FileName { get; set; }
}

这里是我的Parallel.ForEach逻辑:

var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

例如,即使有两个文件作为输入,documents总是给出一个文件作为输出。我错过了什么吗?
我最初使用的是List<DocumentData>。我发现它不是线程安全的,所以改成了ConcurrentBag<DocumentData>。但我仍然得到了意外的结果。请帮助我找出问题出在哪里?
2个回答

7
我猜这是因为Parallel.Foreach不支持async/await。它只接受Action作为输入,并为每个项目执行它。对于异步委托,它将以“fire-and-forget”方式执行它们。在这种情况下,传递的lambda将被视为async void函数,而async void无法等待。
如果有一个重载可以接受Func<Task>,那么它就可以工作。
我建议您使用Select创建Task,并使用Task.WhenAll同时执行它们。
例如:
var tasks = files.Select(async currentFile =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

await Task.WhenAll(tasks);

此外,您可以通过仅从该方法返回DocumentData实例来改进该代码,在这种情况下,无需修改documents集合。 Task.WhenAll有一个重载,它以IEnumerable<Task<TResult>为输入,并生成TResult数组的Task。因此,结果将是这样的:
var tasks = files.Select(async currentFile =>
    {
        if (currentFile.Length > 0)
        {
            using (var ms = new MemoryStream())
            {
                await currentFile.CopyToAsync(ms);
                return new DocumentData
                {
                    BinaryData = ms.ToArray(),
                    FileName = currentFile.FileName
                };
            }
        }

        return null;
    });

var documents =  (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();

如果你正在使用 Task.WhenAll,那么你需要对它进行 await - DavidG
@FarhadJabiyev,现在不需要ConcurrentBag<DocumentData>了吧?我可以使用List<DocumentData>代替。 - fingers10
1
@fingers10 当然可以,因为Parallel.ForEach不支持异步lambda表达式。您可以使用简单的foreach按顺序执行它们,或者使用Task.WhenAll并发执行它们。 - Farhad Jabiyev
2
作为额外的奖励,您可以从Select lambda返回DocumentData,然后await Task.WhenAll(..)将得到一个DocumentData[] - Stephen Cleary
@StephenCleary 不错的想法。值得更新答案。我会这么做的。 - Farhad Jabiyev

3

您的思路是正确的,使用了一个并发集合,但却误用了一个TPL方法.

简而言之,您需要非常小心关于异步Lambda,如果您将它们传递给一个ActionFunc<Task>

您的问题在于Parallel.For / ForEach不适用于异步和等待模式IO绑定任务。它们适用于CPU绑定工作负载。这意味着它们本质上具有Action参数,并且让任务调度程序为您创建任务

如果您想同时运行多个任务,请使用Task.WhenAll,或一个TPL数据流ActionBlock,它可以有效处理CPU绑定IO绑定的工作负载,或更直接地说,它们可以处理任务,而任务是异步方法的内容。

根本问题在于当您在Action上调用异步Lambda时,您实际上创建了一个async void方法,它将以未观察到的任务形式运行。也就是说,您的TPL方法只是创建了一堆并行运行的未观察到的任务,然后没有等待它们。

可以这样理解,您让一些朋友去给您买菜,他们反过来告诉别人去购物,但您的朋友回来向您报告任务已完成。显然,它们没有完成任务,您也没有买到任何东西。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接