我想同时运行一些异步任务,并限制在任何给定时间内等待完成的任务数量。比如你有1000个URL,每次只想打开50个请求;但是只要一个请求完成,就会打开到列表中下一个URL的连接。这样,始终只有50个连接处于打开状态,直到URL列表耗尽。
如果可能的话,我也想利用给定数量的线程。
我已经编写了一个扩展方法
用法:
如果可能的话,我也想利用给定数量的线程。
我已经编写了一个扩展方法
ThrottleTasksAsync
来实现我的愿望。但是是否有更简单的解决方案?我认为这是一种常见情况。用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
以下是代码:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
该方法利用BlockingCollection
和SemaphoreSlim
使其工作。限流器在一个线程上运行,而所有异步任务都在另一个线程上运行。为了实现并行处理,我添加了一个maxDegreeOfParallelism参数,该参数传递给重新命名为while
循环的Parallel.ForEach
循环。
旧版本如下:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
然而,线程池很快就会耗尽,你无法使用 async
/await
。
额外奖励:
为了避免在调用CompleteAdding()
时,在Take()
中抛出异常的问题,我正在使用带有超时的TryTake
重载。如果不使用TryTake
中的超时,那么使用BlockingCollection
的目的就会失去意义,因为TryTake
不会阻塞,是否有更好的方法?理想情况下,应该有一个TakeAsync
方法。
await
在同一个线程上运行它们。Parallel.ForEach
可以实现2或4个并发的while
循环效果。 - Josh Wyant