使用带有异步 lambda 的 Parallel.ForEach 等待所有迭代完成

3

最近我看到了几个与Parallel.ForEach和异步lambda相关的SO帖子,但所有提出的答案都是某种变通方法。

有没有什么办法可以写成以下这种形式:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});

如何确保列表将包含每个迭代中由lambda执行的所有项?

如果遇到await,Parallel.ForEach通常会如何处理异步lambda?它会将其线程移交给下一个迭代吗?

我假设ParallelLoopResult IsCompleted字段不是正确的字段,因为它将在执行所有迭代时返回true,无论它们的实际lambda作业是否完成?


await会强制释放任务线程,但不能保证在等待的任务完成后同一线程将继续执行。但是,Parallel ForEach会保证所有迭代都完成,并且x已添加到列表中,除非发生异常。 - Prateek Shrivastava
3
你最好使用var results = await Task.WhenAll(arrayvalues.Select(x => LongRunningIoOperationAsync(x)))。并行更适合于CPU密集型工作而不是IO密集型工作。 - juharr
1
你的实现不是线程安全的。你不能在Parallel.ForEach内部调用list.Add(x) - Enigmativity
1
如果您想限制并发异步操作的数量,请查看这里。@ldragicevic - Theodor Zoulias
2个回答

7
最近我看到了几个与Parallel.ForEach混合使用异步lambda表达式相关的SO线程,但所有提出的答案都是一些解决方法。这是因为Parallelasync不兼容。从另一个角度来看,为什么要首先混合它们呢?它们所做的事情正好相反。 Parallel 增加线程,而async放弃线程。如果要同时执行异步工作,请使用Task.WhenAll,那才是正确的工具;Parallel不是。话虽如此,听起来你想用错误的工具,这里是如何做到的...
如何确保list包含在每次迭代中执行的lambdas生成的所有项?
您需要有某种信号,使某些代码可以阻塞,直到处理完成,例如CountdownEventMonitor。顺便说一下,您还需要保护对非线程安全的List<T>的访问。
如果遇到await,Parallel.ForEach会将其线程交给下一个迭代吗?
由于Parallel不理解async lambda表达式,因此当第一个await返回到其调用者时,Parallel将假设该循环的 interation 已经完成。
我假设ParallelLoopResult IsCompleted字段不正确,因为它将在执行所有迭代时返回true,无论它们的实际lambda作业是否已完成?
正确。就Parallel而言,它仅“看到”首次await返回其调用者的方法。因此,它不知道async lambda何时完成。它也会过早地假定迭代已经完成,这会使分区出错。

总是很值得一读,文笔优美。请注意 @ldragicevic,您的接受投票应该在这里。 - TheGeneral
是的,绝对没错。@Stephen 非常感谢你的澄清!有了这些事实,一切都变得更加合理了。 - ldragicevic

5
在这里,您不需要使用Parallel.For/ForEach,您只需要等待一组任务列表。

背景

简而言之,您需要非常注意异步lambda表达式,如果将它们传递给ActionFunc<Task>

您的问题是因为Parallel.For / ForEach不适用于异步和等待模式IO绑定的任务。它们适用于cpu绑定负载。这意味着它们基本上具有Action参数,并且让任务计划程序为您创建任务

如果要同时运行多个异步任务,请使用Task.WhenAll,或一个TPL Dataflow Block(或类似物),它可以有效地处理CPU绑定IO绑定的工作负载,或者更直接地说,它们可以处理任务,这就是异步方法的内容。

除非您需要在lambda内部执行更多操作(您尚未显示),否则只需使用SelectWhenAll
var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int

如果您愿意,您仍然可以使用await。
var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);

注意:如果需要使用Parallel.ForEach的扩展功能(即选项以控制最大并发),有几种方法可以实现,但RX或DataFlow可能是最为简洁的选择。

谢谢,我有点好奇,C#程序能否处理任意长度的项目,例如如果我传递1000个项目,是否明智安排一些有限数量的线程执行所有1000个任务,以小批量的线程进行执行。 - ldragicevic
2
@ldragicevic 默认的任务调度程序会根据工作负载、核心和启发式算法来限制并发。 - TheGeneral
1
感谢您的回复。我只是担心是否需要处理并行资源,但根据之前的评论,这是可以的。 - ldragicevic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接