在Parallel.ForEach中多个async-await链式调用

5
我有一个Parallel.ForEach循环,遍历一个集合。在循环内部,我进行多次网络I/O调用。我使用了Task.ContinueWith并嵌套了后续的异步等待调用。处理顺序无关紧要,但每个异步调用中检索的数据应以同步方式处理。这意味着- 对于每个迭代,从第一个异步调用中检索到的数据应传递给第二个异步调用。完成第二个异步调用后,来自两个异步调用的数据应一起被处理。
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});

我尝试了上面的方法,但没有使用continue await时无法同步工作。现在,上述代码执行完成,但未处理任何记录。

请问是否有帮助?如果需要更多细节,请告诉我。


7
awaitContinueWith 混合在同一代码中似乎非常奇怪,如果已经在线程池上运行,使用 Task.Run 也很奇怪......我会查看重构的可能性,但这不仅仅是有点奇怪! - Marc Gravell
@MarcGravell:你可能是对的。我的意图是以同步的方式并行处理数据,其中发生多个网络I/O调用。 - Souvik Ghosh
4
一般而言,你不应该使用 ContinueWith ...... 几乎从来不需要再使用它了。 - Marc Gravell
如果您可以使用DataFlow库,那么您就有TransformBlock<TInput,TOutput>(以及其他相关类)。或者像这样:支持多线程的异步任务队列限流,适用于当前的使用情况。 - Jimi
3
不要将Parallel.ForEachasync / await混合使用! - user585968
3
Parallel.ForEach 不支持异步操作。传递的 lambda 表达式是 async void 的形式。这个方法会在开始异步操作后立即完成,MaxDegreeOfParallelism 选项也不会被遵守……会有很多问题。仅将此方法用于 CPU 密集型任务。 - Theodor Zoulias
2个回答

7

由于您的方法涉及I/O,因此应编写真正异步的方法,而不仅仅是使用Task.Run在线程池上同步运行。

然后,您可以结合使用Task.WhenAllEnumerable.Select

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}

这将确保每个 国家 > > 计算 顺序进行,但每个 项目 都是并发且异步处理的。
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});

信号量确保最多 2 个并发异步操作,取消标记将在连续 10 次异常后取消所有未完成的任务。
Interlocked 方法确保以线程安全的方式访问 failures。
进一步更新 使用两个信号量来防止多次迭代可能会更有效率。
将所有列表添加封装到单个方法中:
void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}

您可以允许2个线程同时为Http请求提供服务,而1个线程执行添加操作,使该操作具有线程安全性:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));

你如何处理最大并行度和异常?我需要将最大请求/秒保持为2,并且如果连续失败10次,需要从循环中退出。 - Souvik Ghosh
@SouvikGhosh 请看一下这个:如何限制并发异步I/O操作的数量? - Theodor Zoulias

5
我认为你把这个问题过于复杂化了;在 Parallel.ForEach 内部,你已经处于线程池中,因此创建大量额外的任务实际上没有任何好处。因此,如何处理取决于 GetState 等是否同步。如果我们假设它们是同步的,那么可以使用以下代码:
Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

如果它们是异步的,情况会变得更加尴尬;如果我们能够像这样做就好了:nice
// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

然而,这里的问题是Parallel.ForEach中没有任何可等待的回调函数,也就是说:我们在这里悄然地创建了一个async void 回调函数,这是非常糟糕的。这意味着Parallel.ForEach将会在发生未完成的await时认为它已经“完成”,因此:

  1. 我们不知道所有工作实际上何时结束
  2. 您可能正在同时执行比您打算的更多的工作(最大DOP不能得到尊重)

目前似乎没有任何好的API可以避免这种情况。


2
@SouvikGhosh 哎呀,确实 - Parallel.ForEach 有“action”主体(没有 Func<T> 主体),所以:如果我们使用 async 版本,它将使用 async void,这是非常糟糕的GetCountryGetState 是可等待方法吗? - Marc Gravell
@SouvikGhosh 我已经进行了编辑以阻止/解释这一点 - 这是我的错误指导,对不起;我认为在这里没有什么好的选择 - 你可以自己编写“max-dop感知并行异步”代码,但是... - Marc Gravell
@SouvikGhosh 要明确一点:在问题中添加更多的“任务”也无法解决问题。 - Marc Gravell
谢谢,我会看看能否找到类似的东西。 - Souvik Ghosh
@IanRingrose 这样做不会给你最大并行度。 - Marc Gravell
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接