如何在并行foreach中使用await?

9

我花了大部分晚上尝试解决这个问题。

昨天我很幸运地接触到了Parallel.ForEach,它的工作方式与我的期望相符,除了一个细节。

我有以下代码:

Parallel.ForEach(data, (d) =>
{
    try
    {
        MyMethod(d, measurements);
    }
    catch (Exception e)
    {
        // log
    }
});

在方法"MyMethod"中,我有很多逻辑正在执行,大部分都很好,但是我会做一些API调用来获取数据,并且我使用了一个异步任务来进行此操作,以便能够使用"await"等待代码直到特定部分被执行,然后继续执行:
private async void MyMethod(PimData pimData, IEnumerable<ProductMeasurements> measurements)
{
    try
    {
        // a lot of logic but most relevant part 

        await Task.WhenAll(ExecuteMeasurmentAndChartLogic(pimData.ProductNumber, entity));

        await Task.WhenAll(resourceImportManager.HandleEntityImageFiles(pimData.ProductType + pimData.ProductSize,SwepImageType.Png, ResourceFileTypes.ThreeD, entity, LinkTypeId.ProductResource));

        await Task.WhenAll(resourceImportManager.HandleEntityImageFiles(pimData.ProductSketch, SwepImageType.Png, ResourceFileTypes.Sketch, entity, LinkTypeId.ProductResource));

    }
    catch (Exception e)
    {
        // log
    }
}

问题:

1 首先,循环在所有代码完成之前就结束了。

2 第二个问题是,我在很多 API 调用中都会收到“任务已取消”的消息。

3 正如上面提到的,代码不等待每个方法完全执行。

我无法让它在移动到下一步之前执行ExecuteMeasurmentAndChartLogic()方法中的所有内容。

这给我带来了以下问题(更多问题):

在此方法中,我创建一个项并将其添加到数据库中,而此项目需要更多信息,我从ExecuteMeasurmentAndChartLogic()内部进行的 API 调用中获取,但问题是创建了几个项,并且必须等待其余数据,这不是我想要的。

旁注:我知道在所有数据都到位之前创建条目并将其添加到数据库中不是最佳实践,但我正在朝着 PIM 进行集成,该过程是微妙的

我想要多个线程运行,但同时我希望每个项在移动到下一个方法之前完整地执行所有逻辑。

澄清:

运行多个项

每个项在移动到代码的下一部分之前处理全部需要处理的逻辑。通常使用await实现此目的。

在上面的代码中,resourceImportManager()方法在ExecuteMeasurmentAndChartLogic()完成之前就已经执行了,这不是我想要的。

我使用了:

Task task1 = Task.Factory.StartNew(() => MyMethod(data, measurements));
Task.WaitAll(task1);

但是这并没有帮助太多。

我对此还很新,一直没有弄明白我做错了什么。

编辑:已经更新了这个问题。

编辑:这是ExecuteMeasurmentAndChartLogic()的样子:

public async Task ExecuteMeasurmentAndChartLogic(string productNumber, Entity entity)
{
    try
    {
        GrafGeneratorManager grafManager = new GrafGeneratorManager();
        var graphMeasurmentList = await MeasurmentHandler.GetMeasurments(productNumber);

        if (graphMeasurmentList.Count == 0) return;

        var chart = await grafManager.GenerateChart(500, 950, SystemColors.Window, ChartColorPalette.EarthTones,
                    "legend", graphMeasurmentList);

        await AddChartsAndAddToXpc(chart, entity, productNumber);
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
}

编辑: 此背景: 我调用一个API获取大量数据。对于这些数据中的每个项目,我需要调用API并获取我应用于该项目的数据。

在阅读评论后,我还以不同的方式思考。我可以循环遍历所有项目,并针对它们进行轻微的逻辑处理,在任务列表中添加URL,然后创建单独的任务,逐个执行这些任务。

将保持更新


3
resourceImportManager.HandleEntityImageFiles 返回什么?初步看起来你正在针对单个任务使用 WaitAllWhenAll。此外,async void 是不可取的,你可以通过一些重构可能将 Parallel.ForEach 改为 Task.WhenAll - Peter Bons
4
同时,Parallel.For 更适用于 CPU 密集型任务,而 Task.WhenAll 更适用于 I/O 密集型任务。在 Parallel.For 中使用 async/await 是一种设计缺陷,以我看来。 - Peter Bons
4
与上面的评论相呼应,如果是 CPU-bound,则应使用 Parallel.ForEach,如果是 IO-bound,则应使用异步操作。 - Tim Rogers
3
我建议你在创建这样的逻辑之前阅读更多的教程,因为你混淆了太多的东西,而这不是解释所有这些内容的最佳地点。 - Peter Bons
@Peter Bons 我在使用 Task.WhenAll 之前确实使用了 await,但结果相同。至于更多的阅读,我同意并且已经做了,但我需要进行优化并且正在尝试边学边做。 - ThunD3eR
显示剩余9条评论
1个回答

17

不要使用Parallel.ForEach。将您的方法返回为Task而不是void,收集所有任务并像下面这样等待它们:

Task.WaitAll(data.Select(d => MyMethod(d, someParam)).ToArray());

1
我提供的代码是异步和并行的。如果线程池配置损坏或者没有足够的线程来捕获,它可能在一个线程中执行。或者可能是MyMethod存在瓶颈,实际上访问了相同的阻塞资源。然而,如果这个方法是有效的,那么我的答案也应该是有效的。 - Eduard Lepner
3
那么,你实际使用的方法并不是为并行运行而设计的,你需要解决这个问题。 - Servy
3
那个回答减少了可以并行完成的主要工作量,并错误地将不能并行完成的工作并行化,导致代码不安全。 - Servy
1
最终决定接受这个答案,因为它回答了所提出的问题。然而,我不会使用该解决方案,因为我需要多线程,而这会锁定每个线程。 - ThunD3eR
1
如何使用 Task.WaitAll(data.Select(d => Task.Run(() => MyMethod(d, someParam)).ToArray())); 保留并行性?我自己还没有测试过,这只是一个想法。 - turkinator
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接