C# 任务线程池 - 在仅使用10个线程的情况下运行100个任务

9
我想知道有没有人能指引我一下关于async/await框架和线程池的问题?
基本上,我想要做的是在一个最大线程数为y的异步线程中执行x个操作。
例如,假设我有100个数据库操作: await _repository.WriteData(someData); 我希望有一种方法可以每次同时运行10个操作(最好是分别在10个线程中),并且在每个操作完成后,在空闲线程上启动下一个操作。然后我们等待所有操作完成和所有线程结束...
这是否可以在不太费力或添加过多复杂性的情况下轻松实现?

为什么不直接使用await,让框架为您处理线程呢?您是否进行过性能测试,以确定该框架不适合您的需求? - Gabriel Sadaka
@gabriel,框架不知道最佳IO并行性。它怎么可能知道呢? - usr
@Eser,我同意这是一个重复的问题,但是对于这个问题给出的答案已经更高质量了,所以这看起来很有前途。 - Kirill Shlenskiy
7
等一下,我有点糊涂了。如果它们是IO操作,那么为什么还需要任何线程?你不会因为收到一封信就雇佣一个工人负责在邮箱旁等待这封信。为什么要雇佣一堆线程来等待数据库呢? - Eric Lippert
不,它并不会,但在这种情况下,示例是针对数据库操作的,数据库服务器将负责处理并行IO操作。这实际上取决于用例,而他尚未指定。 - Gabriel Sadaka
2个回答

21

我认为你关注的是线程,特别是对于不需要线程执行的异步操作,你可能错过了重点。

.NET拥有一个很好的ThreadPool可供使用。你不知道里面有多少个线程,也不用在意,因为它可以自动运行(除非它出故障并且你需要自己配置,但那很高级)。

ThreadPool上运行任务非常简单。要么为每个操作创建一个任务并使用SemaphoreSlim来限制它们的数量,要么使用现成的TPL Dataflow块。例如:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time

foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}

block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.

然而,如果您确实打算创建“专用”线程,可以使用Task.Factory.StartNewLongRunning选项来创建一个位于ThreadPool之外的专用线程。但请记住,异步操作在整个操作期间不会保持相同的线程,因为异步操作不需要线程。因此,在专用线程上开始可能是没有意义的(有关更多信息,请参见我的博客:LongRunning Is Useless For Task.Run With async-await


1
TPL Dataflow非常适合所述问题。不确定为什么会有人对此进行反对。 - Kirill Shlenskiy
2
TPL Dataflow不是“内置”的。我从你的博客中学到了这一点,哈哈。 - hyankov
@i3arnon 如何在完成10项任务中的任意一项时向队列添加新任务。这意味着向队列添加一个新任务。我的目标是举例说明,如果我们已经通过SemaphoreSlim或MaxDegreeOfParallelism设置了单次运行10个任务的限制,但我不想创建100个任务,然后通过SemaphoreSlim或MaxDegreeOfParallelism来控制它们以在单次运行10个任务。 我只想在完成10个任务中的任何一个任务时创建一个新任务,并且此过程将无限继续进行。 - virender
当ActionBlock完成时,我如何按添加项目的顺序获取结果? - dellos
请使用 TransfromBlock - i3arnon

14

@i3arnon的回答是正确的。使用TPL Dataflow。

本答案的其余部分仅用于教育目的和特殊用例。

最近我在一个项目中遇到了类似的问题,其中我不能引入任何外部依赖项,因此我不得不自己编写负载平衡实现,结果非常简单(直到你开始连接取消和有序结果 - 但这超出了这个问题的范围)。

由于其他人已经解释过,异步操作时"10个专用线程"的要求没有意义,所以我将维护多达N个并发的Task实例来处理工作负载。

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);

    if (queue.Count == 0) {
        return;
    }

    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();

            tasksInFlight.Add(taskFactory());
        }

        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

使用方法:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};

await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);

...或者

IEnumerable<SomeData> someDataCollection = ... // Get data.

await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);

这个解决方案不会遭受负载均衡问题,这种问题通常在其他简单实现中很常见,特别是在任务持续时间存在差异且输入已被预分区的情况下(例如此示例)。

带有性能优化和参数验证版本:Gist


不错的解决方案,但有两个建议:1.使用LinkedList<Task>来跟踪正在进行的任务,因为它的插入/删除是O(1),在底层没有涉及到任何内存移动。2.如果一个任务抛出异常,在循环外部捕获它,然后在最终的正在进行列表上WaitAll,将所有异常收集到一个AggregateException中。这样你就可以考虑所有任务和它们的结束状态,而不是让它们潜在地超过你的InvokeAsync - Boris B.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接