如何限制并发异步I/O操作的数量?

149
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

这里有一个问题,它会开始1000多个同时的网络请求。是否有一种简单的方法来限制这些异步http请求的并发数量?以便在任何给定时间内下载的网页不超过20个。如何以最有效的方式做到这一点?


3
这与您之前的问题有何不同? - svick
1
使用ParallelOptions参数。 - Chris Disley
4
@ChrisDisley,这只会并行启动请求。 - spender
3
此处提到 HttpClient 实现了 IDisposable 接口,需要对其进行释放,尤其是当你要使用1000个以上的实例时。可以将 HttpClient 用作多个请求的单例。 - Shimmy Weitzhandler
3
@Shimmy,你永远不应该处理 HttpClient:https://dev59.com/WGUo5IYBdhLWcg3w3ymi#15708633 - avs099
显示剩余3条评论
12个回答

-2

基本上,您需要为要访问的每个URL创建一个操作或任务,将它们放入列表中,然后处理该列表,限制可以并行处理的数量。

我的博客文章展示了如何使用任务和操作来完成此操作,并提供了一个示例项目,您可以下载并运行以查看两者的效果。

使用操作

如果使用操作,则可以使用内置的.Net Parallel.Invoke函数。在这里,我们将其限制为最多同时运行20个线程。

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

使用任务

在任务中,没有内置的函数。但是,您可以使用我在博客上提供的函数。

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

然后创建您的任务列表并调用函数以使它们运行,例如每次最多同时运行20个,您可以这样做:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);

我认为你只是在为SemaphoreSlim指定initialCount,而你需要在SemaphoreSlim的构造函数中指定第二个参数即maxCount。 - Jay Shah
我想将每个任务的响应处理成一个列表。我该如何获取返回结果或响应? - venkat

-2

并行计算应该用于加速 CPU 绑定操作。这里我们谈论的是 I/O 绑定操作。你的实现应该是 purely async,除非你正在超载多核 CPU 上繁忙的单个内核。

编辑 我喜欢 usr 提出的在这里使用 "async semaphore" 的建议。


好观点!虽然这里的每个任务都包含异步和同步代码(页面异步下载,然后以同步方式处理)。我正在尝试将代码的同步部分分布在CPU上,并同时限制并发异步I/O操作的数量。 - Grief Coder
不要在线程池中运行长时间运行/阻塞操作。@SeanU您的建议是不良实践,可能会导致许多意外和不良副作用。 - spender
3
我认为这个回答没有提供答案。仅仅是异步的还不够:我们真正想要的是以非阻塞的方式限制物理IO操作。 - usr
在理想情况下,“整个池子”应该只代表系统中的处理器数量。任何更大的值都表示ThreadPool过度紧张。因为ThreadPool不愿意启动额外的线程,只有在持续压力下才会这样做,所以依赖于流畅ThreadPool的其他操作现在将受到这种隐式延迟的影响。例如:System.Threading.Timer在ThreadPool上触发其回调。现在,由于ThreadPool中只有少量长期任务,它们无法按时到达。 - spender
1
嗯...我不确定我是否同意...当处理大型项目时,如果有太多开发人员持这种观点,即使每个开发人员的单独贡献不能将事情推向边缘,你会遇到饥饿问题。考虑到只有一个线程池,即使你半尊重地处理它...如果其他人也在这样做,问题就会随之而来。因此,我*始终建议不要在ThreadPool中运行长时间的任务。 - spender
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接