如何限制多个异步任务的速度?

14

我有一些如下形式的代码:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}

问题是,如果我不控制线程,事情就会开始崩溃。现在,我想启动最多throttle个线程,并且只有当旧线程完成时才启动新线程。我尝试了一些方法,但到目前为止没有任何方法可以解决问题。我遇到的问题包括:

  • tasks集合必须完全填充所有任务,无论是活动的还是等待执行的,否则最终的.Wait()调用只会查看它启动的线程。
  • 链接执行似乎需要使用Task.Run()或类似的方法。 但是我需要从一开始就对每个任务有一个引用,而实例化任务似乎会自动启动它,这正是我不想要的。

如何做到这一点?


也许将一定数量的“节流”任务添加到列表中,并每次运行它们。 - Clive DM
@CliveDM 我不太清楚你的意思。也许可以发一个带有样例代码的答案? - Shaul Behr
http://blog.danskingdom.com/tag/c-task-thread-throttle-limit-maximum-simultaneous-concurrent-parallel/ - Niklas Peter
9个回答

29

首先,抽象出线程的概念。特别是当您的操作是异步的时候,您根本不应该考虑“线程”。在异步世界中,您有任务,并且相对于线程可以拥有大量的任务。

使用SemaphoreSlim可以限制异步代码:

static async Task DoSomething(int n);

static void RunConcurrently(int total, int throttle) 
{
  var mutex = new SemaphoreSlim(throttle);
  var tasks = Enumerable.Range(0, total).Select(async item =>
  {
    await mutex.WaitAsync();
    try { await DoSomething(item); }
    finally { mutex.Release(); }
  });
  Task.WhenAll(tasks).Wait();
}

17

在我看来,最简单的选择是使用TPL Dataflow。您只需创建一个ActionBlock,将其限制在所需的并行度范围内并开始向其中发布项目。它确保仅同时运行一定数量的任务,并在任务完成时开始执行下一个项目:

async Task RunAsync(int totalThreads, int throttle) 
{
    var block = new ActionBlock<int>(
        DoSomething,
        new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });

    for (var n = 0; n < totalThreads; n++)
    {
        block.Post(n);
    }

    block.Complete();
    await block.Completion;
}

8
如果我理解正确,您可以通过throttle参数限制启动任务的数量,并在启动下一个任务之前等待它们完成。
要等待所有已启动的任务完成后再启动新任务,请使用以下实现。
static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            await Task.WhenAll(tasks);
            tasks.Clear();
        }
    }
    await Task.WhenAll(tasks); // wait for remaining
}

要在完成任务时添加任务,您可以使用以下代码:

要在完成任务时添加任务,您可以使用以下代码

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            var completed = await Task.WhenAny(tasks);
            tasks.Remove(completed);
        }
    }
    await Task.WhenAll(tasks); // all threads must complete
}

@ShaulBehr 是的,就是这样。 - Clive DM
1
除了这个会等待每个批次完成之外,它与关闭操作相同,而不是在一个线程完成时立即启动新线程。 - Shaul Behr
1
@ShaulBehr 我更新了我的回答(尚未测试)。看看是否有帮助。 - Sriram Sakthivel

7

Stephen Toub在他的基于任务的异步模式文档中给出了以下限流示例。

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

7
微软的响应式扩展(Rx) - NuGet "Rx-Main" - 已经非常好地解决了这个问题。
只需执行以下操作:
static void RunThreads(int totalThreads, int throttle) 
{
    Observable
        .Range(0, totalThreads)
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Wait();
}

任务完成。


6

.NET 6 引入了Parallel.ForEachAsync。你可以像这样重写代码:

static async ValueTask DoSomething(int n)
{
    ...
}

static Task RunThreads(int totalThreads, int throttle)
    => Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));

注意事项:

  • 我不得不将您的DoSomething函数的返回类型从Task更改为ValueTask
  • 您可能希望避免调用.Wait(),因此我将RunThreads方法设置为异步。
  • 从您的示例代码中不明显为什么需要访问单个任务。该代码不会提供对任务的访问,但在许多情况下仍然可能有所帮助。

这个使用Task.Run启动新线程吗?还是只是像Task.WhenAll的限制版本一样限制awaits的数量? - undefined

2
以下是一些扩展方法变体,可以在Sriram Sakthivel的答案上进行扩展。
在使用示例中,对DoSomething的调用被包装在一个明确转换的闭包中,以允许传递参数。
public static async Task RunMyThrottledTasks()
{
    var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    await myArgsSource
        .Select(a => (Func<Task<object>>)(() => DoSomething(a)))
        .Throttle(2);
}

public static async Task<object> DoSomething(int arg)
{
    // Await some async calls that need arg..
    // ..then return result async Task..
    return new object();
}

public static async Task<IEnumerable<T>> Throttle<T>(IEnumerable<Func<Task<T>>> toRun, int throttleTo)
{
    var running = new List<Task<T>>(throttleTo);
    var completed = new List<Task<T>>(toRun.Count());
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
            completed.Add(comTask);
        }
    }
    return completed.Select(t => t.Result);
}

public static async Task Throttle(this IEnumerable<Func<Task>> toRun, int throttleTo)
{
    var running = new List<Task>(throttleTo);
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
        }
    }
}

-1

您实际上可以模拟 .NET 6 中引入的 Parallel.ForEachAsync 方法。为了模拟相同的方法,您可以使用以下代码。

public static Task ForEachAsync<T>(IEnumerable<T> source, int maxDegreeOfParallelism, Func<T, Task> body) {
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

1
这个解决方案有缺陷,原因在这里已经解释了,因此我投了反对票。 - Theodor Zoulias
你建议使用 SemaphoreSlim 而不是 @TheodorZoulias 吗? - Nayanava
1
是的,SemaphoreSlim 是一个可靠的解决方案,并且清晰地传达了其意图。 - Theodor Zoulias

-1

没有必要手动实现TaskScheduler。你可以直接使用ConcurrentExclusiveSchedulerPair.ConcurrentScheduler。但更大的问题是TaskSchedulers 完全无法对异步操作进行节流。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接