Parallel.ForEach and async-await

59

我有这样的一个方法:

public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();

    foreach(var method in Methods)
    {
        string json = await Process(method);

        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);

    }

    return result;
}

然后我决定使用Parallel.ForEach

public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();

    Parallel.ForEach(Methods, async method =>
    {
        string json = await Process(method);    

        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);
    });

    return result;
}

但是现在我遇到了一个错误:

在异步操作仍在挂起的情况下,一个异步模块或处理程序已经完成。


你在哪里遇到了这个错误?我猜想这是一个异常,它是否发生在“GetResult”函数内部? - Peter Ritchie
你的“Model”实际上是一个视图模型吗?它是否实现了INotifyPropertyChanged并绑定到视图? - Peter Ritchie
不,它不是视图模型,可能我需要更改名称。它只是一个带有一些属性的简单“类”。 - Sergino
在返回 return result; 时出现异常。 - Sergino
另一个相关的问题:使用异步lambda的并行foreach - Theodor Zoulias
显示剩余2条评论
4个回答

88

ForEach 方法与 async 不兼容。具体来说,您的 async lambda 将被转换为 async void 方法。有很多避免使用 async void 的原因(我在 MSDN 文章中描述了其中之一);其中之一是您无法轻松检测到 async lambda 已完成。ASP.NET 将看到您的代码返回了而没有完成 async void 方法,然后会(恰当地)抛出异常。

您可能想要做的是以并发方式处理数据,但不是并行方式。在 ASP.NET 上几乎不应该使用并行代码。以下是使用异步并发处理的代码示例:

public async Task<MyResult> GetResult()
{
  MyResult result = new MyResult();

  var tasks = Methods.Select(method => ProcessAsync(method)).ToArray();
  string[] json = await Task.WhenAll(tasks);

  result.Prop1 = PopulateProp1(json[0]);
  ...

  return result;
}

2
为什么不应该在ASP.NET中使用并行处理? - Dirk Boer
20
@DirkBoer说:并行代码会显著降低ASP.NET的可伸缩性,并干扰其线程池调度策略。仅当您有可并行化的CPU密集型工作需要完成,并且确定只有少量并发用户时,才会有用。 - Stephen Cleary
1
如果我有大量的项目需要处理,这段代码不会尝试同时启动它们吗?这将需要数百个线程。我想,将多线程级别限制在CPU核心数量左右会比尝试同时执行所有任务更快,从而导致大量的任务切换开销。 - ygoe
2
@ygoe:“这段代码会同时启动它们吗?” 是的。 “需要数百个线程吗?” 不需要 - Stephen Cleary
1
@jmath412:听起来像是一个异步问题;Parallel.ForEach 不能与 async 一起使用。新的 Parallel.ForEachAsync 可以,或者 Task.WhenAll 也可以。 - Stephen Cleary
显示剩余11条评论

22

.NET 6终于增加了Parallel.ForEachAsync,这是一种调度异步工作的方式,允许您控制并行度:

var urlsToDownload = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://twitter.com/shahabfar"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urlsToDownload, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url, token);
    // The request will be canceled in case of an error in another URL.

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

13

或者,使用AsyncEnumerator NuGet包,您可以这样做:

using System.Collections.Async;

public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();

    await Methods.ParallelForEachAsync(async method =>
    {
        string json = await Process(method);    

        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);
    }, maxDegreeOfParallelism: 10);

    return result;
}

其中 ParallelForEachAsync 是一个扩展方法。


5
啊,好的。我想我现在知道怎么回事了。async method =>是“异步void”,这意味着调用者不能确定何时完成操作。因此,GetResult在操作仍在运行时返回。尽管我第一次回答的技术细节不正确,但结果在这里是相同的:ForEach启动的操作仍在运行时,GetResult就返回了。你唯一能做的事情就是不在Processawait(使lambda不再是async),并等待Process完成每个迭代。但是,这将使用至少一个线程池线程来执行此操作,并轻微地压力池-很可能使ForEach无意义。我只是不会使用Parallel.ForEach...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接