如何限制并发异步I/O操作的数量?

149
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

这里有一个问题,它会开始1000多个同时的网络请求。是否有一种简单的方法来限制这些异步http请求的并发数量?以便在任何给定时间内下载的网页不超过20个。如何以最有效的方式做到这一点?


3
这与您之前的问题有何不同? - svick
1
使用ParallelOptions参数。 - Chris Disley
4
@ChrisDisley,这只会并行启动请求。 - spender
3
此处提到 HttpClient 实现了 IDisposable 接口,需要对其进行释放,尤其是当你要使用1000个以上的实例时。可以将 HttpClient 用作多个请求的单例。 - Shimmy Weitzhandler
3
@Shimmy,你永远不应该处理 HttpClient:https://dev59.com/WGUo5IYBdhLWcg3w3ymi#15708633 - avs099
显示剩余3条评论
12个回答

216

您可以在最新版本的 .NET 中,使用 .NET 4.5 Beta 来实现异步操作。之前“usr”发布的文章指出了 Stephen Toub 写的一篇好文章,但是不太被人知道的是,异步信号量已经被纳入到 .NET 4.5 的 Beta 版本中。

如果您看一下我们心爱的SemaphoreSlim类(由于其性能优于原始的Semaphore,所以您应该使用它),现在它拥有了WaitAsync(...)系列重载方法,包括所有预期的参数——超时时间间隔、取消标记和您通常的调度伙伴们 :)

Stephen还写了一篇关于新的.NET 4.5好处的最新博客文章,该版本与beta一起发布,请参见.NET 4.5 Beta中的并行性有什么新功能

最后,这里是有关如何使用SemaphoreSlim进行异步方法限流的示例代码:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

最后,但可能值得一提的是一种使用基于TPL调度的解决方案。您可以在TPL上创建未启动的委托绑定任务,并允许自定义任务调度程序限制并发性。事实上,这里有一个MSDN示例:

另请参见 TaskScheduler


3
使用有限度的并行性的Parallel.ForEach是否是更好的方法?http://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.maxdegreeofparallelism.aspx - GreyCloud
2
为什么不释放你的 HttpClient - Shimmy Weitzhandler
7
Parallel.ForEach 可以与同步代码一起使用,这使得你可以调用异步代码。 - Josh Noe
62
鉴于这个答案非常受欢迎,值得指出 HttpClient 应该是一个通用的单一实例,而不是每次请求都要创建一个实例。 - Rupert Rawnsley
5
在这里,使用Task.Run()是必要的,因为如果您正常等待(await)请求完成,那么请求将按顺序一个接一个地处理(因为它在等待请求完成后才继续循环的其余部分),而不是并行处理。但是,如果您不等待请求,则会在任务被调度后立即释放信号量(允许所有请求同时运行),这就打败了首次使用它的目的。由 Task.Run 创建的上下文仅是一个持有信号量资源的位置。 - Nick
显示剩余10条评论

26

如果你有一个IEnumerable(即URL字符串),并且想要对每个URL进行I/O绑定操作(例如执行异步的http请求)并发地进行操作,并且可以选择实时设置最大并发I/O请求数量,那么你可以按照以下方式操作。这种方法不使用线程池等,而是使用SemaphoreSlim来控制最大并发I/O请求,类似于滑动窗口模式,其中一个请求完成后,将释放信号量,下一个请求会进入。

用法:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }

我需要Dispose一个SemaphoreSlim吗? - AgentFire
不需要在此实现和使用中显式处理SemaphoreSlim,因为它在方法内部使用,并且该方法不访问其AvailableWaitHandle属性。否则,我们将需要在using块中进行处理或包装。 - Dogu Arslan
3
考虑我们教给他人的最佳实践和经验教训。 使用 using 会很好。 - AgentFire
好的,这个例子我能理解,但是我正在努力寻找最好的方法来做到这一点。基本上,我有一个节流器,但我的函数将返回一个列表,我最终希望在完成时得到全部列表...这可能需要锁定列表,你有什么建议吗? - Seabizkit
1
你可以稍微更新一下这个方法,使其返回实际任务的列表,并在调用代码中使用 await Task.WhenAll。一旦 Task.WhenAll 完成,你就可以枚举列表中的每个任务并将其列表添加到最终列表中。更改方法签名为: 'public static IEnumerable<Task<TOut>> ForEachAsync<TIn, TOut>( IEnumerable<TIn> inputEnumerable, Func<TIn, Task<TOut>> asyncProcessor, int? maxDegreeOfParallelism = null)' - Dogu Arslan
显示剩余2条评论

18
发布 .NET 6(在2021年11月),对于除了 ASP.NET 之外的所有应用程序而言,限制并发异步 I/O 操作数量的推荐方法是使用 Parallel.ForEachAsync API,并配置 MaxDegreeOfParallelism。以下是实际使用示例:
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

在上面的示例中,Parallel.ForEachAsync 任务以异步方式进行等待。如果需要,您还可以同步地Wait它,这将阻塞当前线程直到所有异步操作完成。同步的Wait 的优点是,在出现错误时,所有异常都会传播。相反,await运算符只按设计传播第一个异常。如果这是一个问题,您可以在这里找到解决方案。
关于ASP.NET的注意事项:Parallel.ForEachAsync API通过在线程池上启动多个工作器(任务),并且所有工作器都以并行方式调用body委托来工作。这与MSDN文章“异步编程:介绍ASP.NET上的异步/等待”中提供的建议相悖:
“您可以通过等待Task.Run来启动一些后台工作,但这样做没有意义。实际上,这将通过干扰ASP.NET线程池的启发式算法来损害可伸缩性。如果您在ASP.NET上有需要进行CPU绑定工作,最好直接在请求线程上执行。作为一般规则,在ASP.NET上不要将工作排队到线程池。”
因此,在ASP.NET应用程序中使用Parallel.ForEachAsync可能会损害应用程序的可伸缩性。在ASP.NET应用程序中,并发是可以接受的,但应避免并行处理。
从目前提交的答案中,只有Dogu Arslan's的答案适用于ASP.NET应用程序,尽管在异常情况下它的行为并不理想(即在错误发生时,Task可能无法足够快地完成)。

11

使用信号量的直接方法在错误情况下存在许多陷阱,因此我建议使用AsyncEnumerator NuGet包来代替重新发明轮子:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

9
如前面的帖子所述,除非你真的喜欢在生产环境中遇到 socket 耗尽问题,否则不应在任何循环中创建新的 HttpClients。请注意不要改变原意。 - CajunCoding

7

不幸的是,.NET Framework缺少用于编排并行异步任务的大多数重要组合器。没有内置此类工具。

看看由最受尊敬的Stephen Toub创建的AsyncSemaphore类。您需要的是称为信号量的东西,并且您需要其异步版本。


15
请注意:在 .NET 4.5 Beta 中,"不幸的是,.NET Framework 缺少用于编排并行异步任务的大部分重要组合器。没有内置的这种东西。" 的说法已经不再正确。SemaphoreSlim 现在提供了 WaitAsync(...) 功能 :) - Theo Yaung
SemaphoreSlim(具有其新的异步方法)是否应优先于AsyncSemphore,还是Toub的实现仍然具有一些优势? - Todd Menier
在我看来,应该优先选择内置类型,因为它很可能经过了充分的测试和精心设计。 - usr
6
Stephen在他的博客文章中回应了一个问题并补充说,使用SemaphoreSlim来处理.NET 4.5通常是比较合适的做法。 - jdasilva

3

SemaphoreSlim 在这里非常有帮助。以下是我创建的扩展方法:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();

            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();

                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }

            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

使用示例:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

框架里还是没有内置这个功能吗? - Simon_Weaver
你是否曾经制作过 SelectAsyncConcurrent 版本的代码? - Simon_Weaver
@Simon_Weaver 我认为目前框架没有内置机制来实现这个。 - Jay Shah
1
@Simon_Weaver 不,我还没有构建SelectAsyncConcurrent版本,但那将是一个有趣的实现。 - Jay Shah
1
我刚刚写了一个非常笨拙的程序,它只是简单地调用了 ForEachAsyncConcurrent。因为我只需要在一个地方使用它,所以这样做还可以接受。我只创建了一个 ConcurrentStack,并在调用您的函数时向其中添加了项目。对于我来说,顺序并不重要,但如果其他人尝试,请不要使用 List,因为 a) 它不是线程安全的,b) 结果可能无法按照相同的顺序返回。 - Simon_Weaver
显示剩余2条评论

0
尽管可能会很快排队1000个任务,但“并行任务”库只能处理等于机器CPU核心数的并发任务。这意味着如果您有一台四核机器,在任何给定时间只有4个任务将被执行(除非您降低MaxDegreeOfParallelism)。

10
没错,但这与异步 I/O 操作无关。即使在单个线程上运行,上述代码也会启动1000多个同时下载。 - Grief Coder
那里没有看到 await 关键字。移除它应该可以解决问题,对吗? - scottm
2
该库肯定能够同时处理更多任务(具有“运行”状态)而不仅仅是核心数量。这在I/O绑定任务中尤其如此。 - svick
@svick:是的。你知道如何高效地控制最大并发TPL任务(而不是线程)吗? - Grief Coder

0
在较新版本的.NET(Core 1.0或更高版本)中,您可以使用内置的TPL Dataflow
using System.Threading.Tasks.Dataflow;

var client = new HttpClient();

var block = new TransformBlock<string, string>(
    client.GetStringAsync,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
foreach (string url in urls) {
    block.Post(url);
}
block.Complete();

string[] htmls = await block.ReceiveAllAsync().ToArrayAsync();

假设你确实需要接收到的内容,而使用Dataflow可以完成比这更复杂的任务。
请注意,您需要安装 System.Linq.Async 软件包以获得 ToArrayAsync 功能。
如评论中提到的那样,如果 GetStringAsync 失败,ReceiveAllAsync 可能会存在危险。在这种情况下,如果您希望停止管道并传播任何异常,不要使用 ReceiveAllAsync:
var htmls = new List<string>();
while (await block.OutputAvailableAsync())
{
    while (block.TryReceive(out string result))
    {
        htmls.Add(result);
    }
}
await block.Completion; // This propagates exceptions

或者如果您想继续,但记录所有异常:

var block = new TransformBlock<string, (string? html, Exception? exception)>(
    async url =>
    {
        try
        {
            return (await client.GetStringAsync(url), null);
        }
        catch (Exception e)
        {
            return (null, e);
        }
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);

(string? html, Exception? exception)[] results =
    await block.ReceiveAllAsync().ToArrayAsync();

这里有几个问题。所有的URL都是一开始就发布在块中,因此不适用于大型(迭代生成的)输入序列。GetStringAsync抛出的OperationCanceledException被忽略了设计如此。由于ReceiveAllAsync中的一个错误,所有异常都被忽略了。因为这些原因,我不同意“你应该”的说法。 - Theodor Zoulias
现在情况好了一些,但我仍然不同意“你应该”的说法,因为它暗示了 TPL Dataflow 明显是这个问题的最佳解决方案。在我看来,Parallel.ForEachAsync API 至少和 TPL Dataflow 一样好,甚至更好。 - Theodor Zoulias
@TheodorZoulias 我不认为Parallel.ForEachAsync是一个等价的替代品。你需要一个线程安全的容器(比如ConcurrentBag<T>)来存储结果,但这并不能保证顺序。PLINQ(urls.AsParallel().WithDegreeOfParallelism(8).Select...)更接近于等价替代,但它会阻塞。无论如何,为了澄清答案,我将措辞改为“你可以”。 - RcINS
当然,Parallel.ForEachAsync并不会收集结果,但问题中没有提及结果。原帖作者只是想知道如何限制异步I/O操作的并发度。一个更与您的回答相关的问题是:ForEachAsync with Result - Theodor Zoulias

-1

这是一个方便的扩展方法,您可以创建一个列表任务来封装它们,以便以最大并发度执行:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
    SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
    // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
    return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
    {
        try { return task; }
        finally { maxOperations.Release(); }
    }).Unwrap());
}

现在不再是:

await Task.WhenAll(someTasks);

你可以走了

await Task.WhenAll(someTasks.WithMaxConcurrency(20));

-1

这不是一个好的实践,因为它会改变全局变量。而且对于异步操作来说也不是一个通用的解决方案。但如果你只想针对所有的 HttpClient 实例进行操作,那么这种方法很简单,你可以尝试:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接