当需要处理大量任务时,如何使用WhenAll?

3

我需要你的帮助找到最佳解决方案。这是我的原始代码:

public async Task Test()
{
    var tasks = new List<Task>();
    string line;
    using (var streamReader = File.OpenText(InputPath))
    {
        while ((line = streamReader.ReadLine()) != null)
        {
            tasks.Add(Process(line));
        }
    }

    await Task.WhenAll(tasks.ToArray());
}

private Task Process(string line)
{
    return Task.Run(() =>
    {
        Console.WriteLine(line);
    });
}

它将读取带有行的文件,并通过任务处理每一行。但是,如果文件有超过100万行,则任务数组会更大,这段代码仍然有效吗?还是我应该找另一个解决方案。请帮忙。谢谢。


4
我投票关闭此问题,因为它不属于此处,而应该在CodeReview上发布。 - Yuval Itzchakov
TPL 内部使用线程池,因此不应启动一百万个线程。但是不确定任务的性能表现如何。仅启动 8 个(核心数)并在这 8 个任务之间分割行可能会提高性能。 - Domysee
@YuvalItzchakov 这不是代码审查。提供代码以理解问题比写很多话更好。我需要的是你的帮助来解决我的问题。谢谢。 - Leo Vo
代码审查不是关于用言语详细阐述,而是关于审查你编写的代码并帮助你改进它,这正是你所要求的。 - Yuval Itzchakov
1个回答

3
这是一个不好的想法。这可能会启动太多的线程。
一个更好的方法是简单地使用Parallel.ForEach(),像这样:
using System;
using System.IO;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            string filename = @"Your test filename goes here";
            Parallel.ForEach(File.ReadLines(filename), process);
        }

        private static void process(string line)
        {
            Console.WriteLine(line);
        }
    }
}

这段代码没有使用async/await。但是,如果你想的话,你可以将对Parallel.ForEach()的整个调用包装在一个任务中。
或者,如果你想使用Task Parallel Library(一个Microsoft NuGet包),你可以这样做:
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            Task.Run(test).Wait();
        }

        static async Task test()
        {
            string filename = @"Your filename goes here";
            await processFile(filename);
        }

        static async Task processFile(string filename)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
            var action = new ActionBlock<string>(s => process(s), options);

            foreach (var line in File.ReadLines(filename))
                await action.SendAsync(line);

            action.Complete();

            await action.Completion;
        }

        static void process(string line)
        {
            Thread.Sleep(100);  // Simulate work.
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + line);
        }
    }
}

这将为您提供async支持。

附录:线程池速率控制的演示。

(这是针对shay__的评论做出的回应。)

如果您启动了许多长时间运行的任务,其中任务运行时间超过一秒左右,您可能会看到线程池速率控制。

如果当前进程的线程池线程数等于或超过调用ThreadPool.GetMinThreads(out workers, out ports);返回的worker计数,则会发生这种情况。

如果发生这种情况,在创建新的线程池线程之前,将会延迟一小段时间(在我的系统上为一秒)。通常情况下,这将允许另一个线程池线程变得可用,并且将使用它(这当然是速率控制的主要原因)。

以下代码演示了这个问题:

int workers, ports;
ThreadPool.GetMinThreads(out workers, out ports);
Console.WriteLine("Min workers = " + workers); // Prints 8 on my system.
var sw = Stopwatch.StartNew();

for (int i = 0; i < 100; ++i)
{
    Task.Run(() =>
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} started at time {sw.Elapsed}");
        Thread.Sleep(10000);
    });
}

Console.ReadLine();

在我的系统上,这将打印出以下内容:
Min workers = 8
Thread 3 started at time 00:00:00.0098651
Thread 6 started at time 00:00:00.0098651
Thread 8 started at time 00:00:00.0099841
Thread 5 started at time 00:00:00.0099680
Thread 7 started at time 00:00:00.0099918
Thread 4 started at time 00:00:00.0098739
Thread 10 started at time 00:00:00.0100828
Thread 9 started at time 00:00:00.0101833
Thread 11 started at time 00:00:01.0096247
Thread 12 started at time 00:00:02.0098105
Thread 13 started at time 00:00:03.0099824
Thread 14 started at time 00:00:04.0100671
Thread 15 started at time 00:00:05.0098035
Thread 16 started at time 00:00:06.0099449
Thread 17 started at time 00:00:07.0096293
Thread 18 started at time 00:00:08.0106774
Thread 19 started at time 00:00:09.0098193
Thread 20 started at time 00:00:10.0104156
Thread 3 started at time 00:00:10.0109315
Thread 8 started at time 00:00:10.0112171
Thread 7 started at time 00:00:10.0112531
Thread 9 started at time 00:00:10.0117256
Thread 4 started at time 00:00:10.0117920
Thread 10 started at time 00:00:10.0117298
Thread 6 started at time 00:00:10.0109381
Thread 5 started at time 00:00:10.0112276
Thread 21 started at time 00:00:11.0095859
Thread 11 started at time 00:00:11.0101189
Thread 22 started at time 00:00:12.0095421
Thread 12 started at time 00:00:12.0111173
Thread 23 started at time 00:00:13.0095932    ...

请注意前8个线程启动非常快,但是随后新的线程被限制在每秒钟左右一个,直到第一批线程终止并可以被重用。同时要注意,只有当线程花费相对较长的时间才能终止时,才会出现这种效果。

3
“这会启动太多的线程” - 我不确定是否正确。你能详细说明一下吗? - shay__
@shay__ 如果你对文件中的每一行都调用Task.Run(),它将尝试为每一行启动一个线程。在几个线程之后,它会开始为每个线程引入半秒的延迟,这样在简单的测试中将防止它创建过多的线程,因为先前的线程在新线程创建之前已经退出并被重用。但是,如果每行的处理速度足够慢,线程的数量将继续增加。 - Matthew Watson
@shay__ Task.Run() 将线程排队到线程池中,如果当前线程数超过 ThreadPool.GetMinThreads() 返回的工作线程数,则会在创建新线程之前引入半秒延迟。我将发布一些演示此功能的代码。 - Matthew Watson
1
请记住,进行磁盘IO时,生成大量线程并不能真正提高性能。异步执行IO更多地是关于管理资源和避免阻塞。 - sara
1
@kai 同意;但我假设Process()方法需要相当长的时间。 - Matthew Watson
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接