在并行中运行异步方法

54

我有一个异步方法,GetExpensiveThing(),它执行一些昂贵的I/O操作。以下是我使用它的方式:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = await GetExpensiveThing();
    var second = await GetExpensiveThing();
    return new List<Thing>() { first, second };
}

但由于这是一种昂贵的方法,我希望可以并行执行这些调用。我本以为移动 "await" 就能解决这个问题:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = GetExpensiveThing();
    var second = GetExpensiveThing();
    return new List<Thing>() { await first, await second };
}

那个方法没用,所以我用一些任务包装它们,这个方法可以用:

// Parallel execution
public async Task<List<Thing>> GetThings()
{
    var first = Task.Run(() =>
    {
        return GetExpensiveThing();
    });

    var second = Task.Run(() =>
    {
        return GetExpensiveThing();
    });

    return new List<Thing>() { first.Result, second.Result };
}

我甚至尝试在任务中使用awaits和async,但它变得非常混乱,并且没有成功。

有更好的方法以并行方式运行异步方法吗?还是任务是一个不错的方法?


3
你误解了被链接的帖子。它正确地陈述了 async 函数中的 Continuations 是在捕获的上下文中调度的。但在大多数情况下,这个上下文是默认的 SynchronizationContext,它将 Continuations 调度到线程池中,导致它们并行运行。即使在 WPF 和 ASP 应用程序中,你也可以通过使用 ConfigureAwait(false) 来解决这个问题。Task.Run 用于 CPU 绑定任务,并不需要它来并行运行 Continuations。 - V0ldek
@V0ldek 是的,你说得对。谢谢你提醒我注意这个问题。 - Andrii Viazovskyi
4个回答

54
是的,“最好”的方法是利用Task.WhenAll方法。但是,第二种方法应该已经并行运行了。我创建了一个.NET Fiddle,这应该有助于阐明问题。您的第二种方法实际上应该是并行执行的。我的演示证明了这一点!请考虑以下内容:
public Task<Thing[]> GetThingsAsync()
{
    var first = GetExpensiveThingAsync();
    var second = GetExpensiveThingAsync();

    return Task.WhenAll(first, second);
}

注意

建议使用“Async”后缀替代GetThingsGetExpensiveThing,应分别使用GetThingsAsyncGetExpensiveThingAsync - 参见 来源


3
await Task.WhenAll 将返回 Thing[] 数组,因此不需要使用 Result(事实上,Result 会包装异常;你应该使用 await 或使用 await Task.WhenAll 的结果)。 - Stephen Cleary
2
好的,谢谢你指出来 - 我还没喝咖啡,有点迷糊。 :) - David Pine
1
虽然这通常是正确的方式,但 return new List<Thing>() { await first, await second }; 这种写法有什么问题吗?如果提问者说这不起作用,那肯定还有其他原因在起作用... - yaakov
1
这对我不起作用。这些方法仍然按顺序执行。我似乎只能使用那些任务来实现我想要的东西。GetExpensiveThing() 的内部实现是一个异步方法,这会对此产生任何影响吗? - Dave New
@davenewza GetExpensiveThing 的实现是什么样子的? - David Pine
显示剩余3条评论

36

Task.WhenAll()在同时启动大量任务时容易变得不够高效 - 除非进行适度的限制/节流。

如果您正在执行大量任务列表并希望等待最终结果,则建议使用具有并行度限制的partition

我已修改Stephen Toub的博客中优雅的方法来适应现代LINQ:

public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> funcBody, int maxDoP = 4)
{
    async Task AwaitPartition(IEnumerator<T> partition)
    {
        using (partition)
        {
            while (partition.MoveNext())
            {
                 await Task.Yield(); // prevents a sync/hot thread hangup
                 await funcBody(partition.Current);
            }
        }
    }

    return Task.WhenAll(
        Partitioner
            .Create(source)
            .GetPartitions(maxDoP)
            .AsParallel()
            .Select(p => AwaitPartition(p)));
}

它的工作原理很简单,将IEnumerable分解成相对均匀的分区,同时对每个元素在每个分区中同时执行一个函数/方法。每个分区中最多只有一个元素,但n个任务在n个分区中运行。

扩展用法:

await myList.ParallelForEachAsync(myFunc, Environment.ProcessorCount);

编辑: 如果需要更多选项,现在我将一些重载保留在Github存储库中。也有一个适用于NetStandard的NuGet。

编辑2:由于Theodor在下面的评论中提到的问题,我使用await Task.Yield();来解决异步任务写得不好会阻止并行性的问题。


1
好的!如果您想使用Stephen Toub的方法但更喜欢使用方法语法的LINQ,这很完美。 - David Tarulli
我很高兴你喜欢它,那正是我想要的 :) - HouseCat
1
"AsParallel" 是多余的。其后面的 "Select" 没有进行任何 CPU 密集型工作。此外,如果从 Stephen Toub 的代码中移除 "Task.Run",则可能会降低并行度,特别是当 "funcBody" 方法需要大量 CPU 资源且同步完成时。 - Theodor Zoulias
@DanHunex 将 Task.Run 替换为 AsParallel 会引入人为的并行度限制,因为缺少配置 WithDegreeOfParallelism,所以默认的 Environment.ProcessorCount 被使用。请查看 fiddle,请求的 maxDoP: 10 没有被遵守。然后注释掉 .ParallelForEachAsync_HouseCat 这一行,并取消注释 //.ParallelForEachAsync_StephenToub 这一行,可以看到现在请求的 DOP 被遵守了。 - Theodor Zoulias
我同意Dan Hunex的观点,但我将重新测试代码实现的质量。 - HouseCat
显示剩余7条评论

3
你可以使用Task.WhenAll,它会在所有依赖的任务完成后返回。
参考此问题:这里

3
如果GetExpensiveThing正确的异步(意味着它不会同步执行任何IO或CPU工作),您的第二个解决方案调用两个方法然后等待结果应该可以工作。您也可以使用Task.WhenAll
但是,如果它不是异步的,您可以将每个任务提交到线程池,并使用Task.WhenAll组合器获得更好的结果,例如:
public Task<IList<Thing>> GetThings() =>
    Task.WhenAll(Task.Run(() => GetExpensiveThing()), Task.Run(() => GetExpensiveThing()));

注意我将返回类型更改为 IList,以避免完全使用 await。您应该避免使用 Result 属性,因为它会导致调用线程阻塞并等待任务完成,而不像 awaitTask.WhenAll 使用延续。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接