限制异步任务的执行速率

68
我想同时运行一些异步任务,并限制在任何给定时间内等待完成的任务数量。比如你有1000个URL,每次只想打开50个请求;但是只要一个请求完成,就会打开到列表中下一个URL的连接。这样,始终只有50个连接处于打开状态,直到URL列表耗尽。
如果可能的话,我也想利用给定数量的线程。
我已经编写了一个扩展方法ThrottleTasksAsync来实现我的愿望。但是是否有更简单的解决方案?我认为这是一种常见情况。
用法:
class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

以下是代码:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

该方法利用BlockingCollectionSemaphoreSlim使其工作。限流器在一个线程上运行,而所有异步任务都在另一个线程上运行。为了实现并行处理,我添加了一个maxDegreeOfParallelism参数,该参数传递给重新命名为while循环的Parallel.ForEach循环。

旧版本如下:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

然而,线程池很快就会耗尽,你无法使用 async/await

额外奖励: 为了避免在调用CompleteAdding()时,在Take()中抛出异常的问题,我正在使用带有超时的TryTake重载。如果不使用TryTake中的超时,那么使用BlockingCollection的目的就会失去意义,因为TryTake不会阻塞,是否有更好的方法?理想情况下,应该有一个TakeAsync方法。


6
有更好的方法吗?是的,使用TPL Dataflow - Scott Chamberlain
对于URL示例,您可以将所有URL放入ConcurrentBag中,启动50个线程,在每个线程中获取一个URL并执行请求,直到Bag为空。 - Bogdan
一般情况下,使用委托的 ConcurrentBag :) - Bogdan
@Bogdan 我将会执行成千上万个请求,我想使用await在同一个线程上运行它们。Parallel.ForEach可以实现2或4个并发的while循环效果。 - Josh Wyant
我曾经问过类似的问题(https://dev59.com/tWEi5IYBdhLWcg3wjs1j)。Dataflow和Rx似乎是最有趣的选择。同时,我已经测试了Dataflow并且它运行得非常好。 - usr
显示剩余3条评论
3个回答

66

如建议所示,使用TPL Dataflow

一个TransformBlock<TInput, TOutput>可能是你正在寻找的。

您可以定义MaxDegreeOfParallelism来限制并行转换字符串(即下载的url数量)。 然后将url发布到块中,完成后告诉块您已经完成添加项目,并提取响应。

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}
注意: TransformBlock 缓冲其输入和输出。那么,我们为什么需要将其链接到 BufferBlock
因为 TransformBlock 不会完成直到所有项目(HttpResponse)都被消耗完,而且 await downloader.Completion 会挂起。相反,我们让 downloader 将其所有输出转发到一个专用的缓冲块——然后等待 downloader 完成,并检查缓冲块。

@JoshWyant 你的意思是限制同时下载的URL数量吗?可以使用 MaxDegreeOfParallelism - dcastro
假设你有2个核心。不要创建50个线程,而是将MaxDegreeOfParallelism设置为2。在每个核心上,您想要打开25个异步http请求。当一个请求完成时,另一个核心上有49个请求等待处理。await SendAsync()将允许您再次发布一个Task<HttpResponse>,将挂起的请求数量恢复到50? - Josh Wyant
2
@JoshWyant 通过上面的代码,你可以随意发布任意数量的URL(使用SendAsync)。这些URL将被缓冲到块中。该块将继续从缓冲区中获取URL并且每次最多处理50个。然后将结果放入另一个缓冲区中。TransformBlock会同时缓冲其输入和输出。 - dcastro
@JoshWyant 如果我理解正确,我会这样做:使用一个TransformBlock,按顺序执行GET和POST请求,并限制为15个。这样,最多同时进行15个请求(例如10个GET + 5个POST)。然后,将其链接到另一个ActionBlock,将先前的输出存储在数据库中。你觉得这样对吗?此外,请确保使用适当的异步API以实现真正的异步I/O。 - dcastro
2
@dcastro,最终我选择了Dataflow解决方案。我最初的担心是MaxDegreeOfParallelismParallel.ForEach完全相同,只是创建任意数量的线程以实现并行性。但我错了,这个参数在async中非常好用。Tpl.Dataflow非常出色。谢谢! - Josh Wyant
显示剩余9条评论

65
假设你有1000个URL,你只想同时打开50个请求;但是一旦一个请求完成,你就会打开到列表中下一个URL的连接。这样,总是恰好有50个连接处于打开状态,直到URL列表用尽。
下面的简单解决方案已经在SO上出现了很多次。它不使用阻塞代码,也不显式创建线程,因此它的可扩展性非常好:
const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

事实上,下载数据后的处理应该在一个不同的流水线上进行,具有不同的并行级别,特别是在处理受CPU限制的情况下。例如,您可能希望同时使用4个线程进行数据处理(即CPU核心数),并且最多有50个待处理的数据请求(完全不使用线程)。据我所知,这不是您当前代码正在执行的操作。
这就是TPL Dataflow或Rx作为首选解决方案的地方。但是,使用纯TPL也肯定可以实现这样的内容。请注意,此处唯一阻塞的代码是在Task.Run内部执行实际数据处理的代码:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}

3
这是最简单和最直接的答案。基本上就是我试图做的事情。我的错误在于尝试在一个单独的线程上运行信号量,但这使得它变得更简单,并消除了 BlockingCollection。我只是没有意识到可以以那种方式使用 WaitAsync。谢谢 @Noseratio。 - Josh Wyant
@JoshWyant,没问题。我相信如果TPL Dataflow的管道被正确设计和组装,这基本上就是它在幕后所做的事情。只是我缺乏足够的TPL Dataflow技能,但我会投入更多时间去学习它。 - noseratio - open to work
3
如果您在访问相同的端点时使用 .net core 中默认的 new HttpClient() 进行测试,请注意。默认情况下,它会限制每个服务器的连接数(在 fiddler 中看到它限制为 2),除非您指定 new HttpClient(new HttpClientHandler { MaxConnectionsPerServer = ... })。此答案中的所有内容都按照其所述方式工作,但您仍然可能受到该设置的限制。 - tstojecki
1
文档中说:*SemaphoreSlim类表示一种轻量级、快速的信号量,可用于在单个进程内等待,当预计等待时间非常短时。* 我认为,这意味着相比原始的Semaphore(它总是包装一个Win32信号量对象),SemaphoreSlim更适用于短时间等待的优化。但这并不意味着SemaphoreSlim仅适用于短时间等待,并且在我看来也不应该避免长时间等待。 - noseratio - open to work
1
Semaphore比TPL数据流更简单,适合新手使用 :-) - CodingNinja
显示剩余4条评论

5

根据要求,这是我最终使用的代码。

工作以主从配置设置,并且每个主节点都作为批处理进行处理。每个工作单元都按照以下方式排队:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

大师被逐一缓冲以节省其他外部进程的工作。每个主要细节通过 "masterTransform" "TransformManyBlock" 分派到工作中。同时创建了一个 "BatchedJoinBlock" 来在一个批次中收集详细信息。
实际工作是在 "detailTransform" "TransformBlock" 中完成的,异步地进行,每次处理 150 条记录。"BoundedCapacity" 设置为 300,以确保太多主不会在链的开头被缓冲,同时也为足够的详细记录排队留出空间,以允许一次处理 150 条记录。该块向其目标输出一个对象,因为它根据链接过滤跨链接取决于它是 "Detail" 还是 "Exception"。
"batchAction" "ActionBlock" 收集所有批次的输出,并为每个批次执行批量数据库更新、错误日志记录等操作。
将有多个 "BatchedJoinBlock",一个用于每个主。由于每个 "ISourceBlock" 顺序输出,每个批次只接受与一个主关联的详细记录数量,因此批次将按顺序进行处理。每个块只输出一组,并在完成后取消链接。只有最后一个批处理块将其完成传播到最终的 "ActionBlock"。
数据流网络如下:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接