为什么Parallel.ForEach比AsParallel().ForAll()快得多,即使MSDN建议相反?

34
我进行了一些调查,看看如何创建一个运行在树结构中的多线程应用程序。
为了找到最佳实现方法,我创建了一个测试应用程序,它遍历我的C:\磁盘并打开所有目录。
class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunction(startDirectory);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '\\');
        //Console.WriteLine(depth + ": " + currentDirectory);

        try
        {
            var children = Directory.GetDirectories(currentDirectory);

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }

        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

然而我遇到的问题是,当以模式3(Parallel.ForEach)运行时,代码完成时间约为2.5秒(是的,我的固态硬盘很快;))。未使用并行化运行代码时,完成时间约为8秒。以模式2(AsParallel.ForAll())运行代码时,需要几乎无限长的时间。

在进程资源管理器中查看时,我还遇到了一些奇怪的事实:

Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds

Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???

Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

我发现特别奇怪的是,Parallel.ForEach似乎忽略了任何仍在运行的父线程/任务,而AsParallel().ForAll()似乎会等待前一个任务完成(这不会很快,因为所有父任务都在等待它们的子任务完成)。
此外,我在MSDN上读到的是:“如果可能,优先使用ForAll而不是ForEach”。
来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx 有人知道这可能是为什么吗?
编辑1:
根据Matthew Watson的要求,我首先将树加载到内存中,然后再循环遍历。现在,树的加载是顺序进行的。
然而,结果是相同的。未并行化和Parallel.ForEach现在可以在约0.05秒内完成整个树,而AsParallel().ForAll仍然只能每秒完成1步。
代码:
class Program
{
    private static DirWithSubDirs RootDir;

    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");


        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunctionInMemory(RootDir);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }        

    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '\\');
        Console.WriteLine(depth + ": " + currentDirectory.Path);

        var children = currentDirectory.SubDirs;

        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;

        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}

class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }

    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

编辑2:

在阅读了Matthew评论中的更新后,我尝试将以下代码添加到程序中:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

然而,这并不会改变AsParallel的执行方式。仍然会在放慢到每秒1步之前立即执行前8个步骤。
(额外说明,我目前正在忽略由于Try Catch块无法访问目录时发生的异常,周围有Directory.GetDirectories())
编辑3:
我主要感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我来说,第二个创建了一个线程,用于它所做的每次递归,而第一个则最多使用约30个线程处理所有内容。 (还有MSDN为什么建议使用AsParallel,即使它使用了约1秒的超时创建了这么多线程)
编辑4:
我发现另一件奇怪的事情: 当我尝试将Thread pool上的MinThreads设置为1023以上时,它似乎会忽略该值并缩小到大约8或16: ThreadPool.SetMinThreads(1023, 16);
当我使用1023时,它会非常快地完成前1023个元素,然后回到我一直经历的缓慢步伐。
注意:与整个Parallel.ForEach相比,现在实际上创建了1000多个线程。
这是否意味着Parallel.ForEach在处理任务方面更加聪明?
一些更多的信息,此代码在将值设置为1023以上时会打印两次8-8:(当您将值设置为1023或更低时,它会打印正确的值)
        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);

        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);

        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

编辑5:

根据迪恩的要求,我创建了另一个案例来手动创建任务:

case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

这个方法的速度和Parallel.ForEach()循环一样快。所以我们仍然没有答案,为什么AsParallel().ForAll()会慢那么多。


通过 ThreadPool.SetMinThreads(4000, 4000);,你将IO完成端口线程设置为一个疯狂的数字。尝试使用 ThreadPool.SetMinThreads(4000, 16); 替代(对于 SetMaxThreads() 同样适用)。 - Matthew Watson
我已经完成了这个,但不知何故仍然遇到相同的结果。对于模式2,我在资源监视器中每秒钟看到1个额外的线程弹出。当我启用Console.WriteLine时,我看到它以每秒约1级的速度通过我的磁盘移动。模式1和3仍然可以在少于1秒的时间内执行整个磁盘(80.173个元素)(内存中的一个)。 - Devedse
@Devedse,我认为它仍然在启动时启动了所有线程。我还没有完全考虑清楚,但我认为你会在for循环内部放置某种等待,以便循环在递归完成之前不会继续。当它沿着目录树向下走时,您将获得很多深度和可能有几个线程,但至少它不会一次性创建所有线程。您还可以使用Thread.Current.ID来调查线程,也可以记录每种模型创建了多少线程。 - Dean
@Dean,我知道它确实可以这样做,但这就是整个问题所在。用那种方式不会减慢速度。它非常快。只有模式2(使用AsParallel().ForAll())才会减缓到每秒1步。 - Devedse
@Devedse 如果你想在非CPU绑定任务中使用Parallel.Foreach,请使用MaxDegreeOfParallelism = num_of_cores,它可以节省资源并提高速度。在AsParallel.ForAll中,它警告不要将其与IO绑定任务一起使用。 - Pedro.The.Kid
显示剩余6条评论
4个回答

51

这个问题相当容易调试,当你遇到线程问题时,这是一个不常见的奢侈品。您在此处使用的基本工具是“调试 > 窗口 > 线程”调试器窗口。它会显示活动线程,并让您一窥其堆栈跟踪。一旦出现缓慢,您将轻松地看到有数十个活动线程都被卡住了。它们的堆栈跟踪都看起来相同:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

每当你看到像这样的情况,你应该立即想到消防栓问题。 这可能是线程中第三个最常见的错误,排在竞争和死锁之后。

现在你知道了原因,代码的问题在于每个完成的线程都会添加N个线程。其中N是目录中子目录的平均数。实际上,线程数量呈指数增长,这总是不好的。只有当N = 1时,它才能保持控制,当然在典型的磁盘上从来不会发生。

请注意,就像几乎所有线程问题一样,这种行为往往重复得很差。你的机器上的固态硬盘 tend to hide it. 在你的机器上的RAM也一样,程序可能会很快、无故障地完成第二次运行。因为你现在将从文件系统缓存而不是磁盘中读取数据,速度很快。调整ThreadPool.SetMinThreads()同样可以隐藏它,但无法解决它。它永远不能解决任何问题,只能隐藏它们。因为无论发生什么,指数数字始终会压倒设置的最小线程数。您只能希望它在发生这种情况之前完成迭代驱动器。对于拥有大型硬盘的用户来说,这只是无聊的希望。

现在也许可以轻松解释ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别了。你可以从堆栈跟踪中看出ForAll()做了一些不好的事情,RunSynchronously()方法会阻塞直到所有线程完成。 阻塞是线程池线程不应该做的事情,它会使线程池混乱并且不允许其调度另一个作业的处理器。而且会产生你观察到的效果,线程池很快会被等待其他线程完成的N个线程淹没。但实际上它们正在池中等待,并没有被调度,因为已经有太多的它们处于活动状态。

这是死锁方案,是一个相当常见的问题,但线程池管理器对此有一个解决方法。它监视活动的线程池线程,并在它们没有及时完成时介入。然后允许启动一个额外的线程,比SetMinThreads()设置的最小线程数多一个。但不要超过SetMaxThreads()设置的最大数量,如果活动tp线程太多,就会有风险并可能触发OOM。这确实解决了死锁问题,使其中一个ForAll()调用完成。但这个过程的速度非常慢,线程池每秒只执行两次此操作。在它追上之前,你会失去耐心。

Parallel.ForEach()没有这个问题,它不会阻塞,因此不会淹没池。

这似乎是解决方案,但请记住,你的程序仍然正在消耗你机器的内存,不断向池中添加等待的tp线程。这也可能导致程序崩溃,只是不太可能发生,因为你有很多内存,而线程池并不会使用太多内存来跟踪请求。然而,一些程序员也可以通过


7
首先要注意的是,您正在尝试并行化一个IO限制操作,这将显着扭曲时间。
其次要注意的是并行任务的性质:您正在递归地下降目录树。如果创建多个线程来执行此操作,则每个线程可能同时访问磁盘的不同部分-这将导致磁盘读头跳来跳去,严重减慢速度。
尝试更改测试以创建内存中的树,并使用多个线程访问该树。然后,您将能够适当比较时间,而不会使结果扭曲到无法使用。
此外,您可能正在创建大量线程,它们(默认情况下)将成为线程池线程。当线程超过处理器核心数时,拥有大量线程实际上会减慢速度。
还要注意,当您超过线程池最小线程数(由ThreadPool.GetMinThreads()定义)时,线程池管理器在每个新线程池线程创建之间引入了延迟。(我认为每个新线程大约需要0.5秒)。
此外,如果线程数超过 ThreadPool.GetMaxThreads() 返回的值,创建线程将会阻塞,直到其中一个线程退出。我认为这很可能是发生的情况。
您可以通过调用 ThreadPool.SetMaxThreads()ThreadPool.SetMinThreads() 来增加这些值,以测试这个假设,并查看是否有任何区别。
(最后,请注意,如果您真的试图从 C:\ 递归下降,当它到达受保护的操作系统文件夹时,几乎肯定会收到 IO 异常。)
注意:设置最大/最小线程池线程如下所示:
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

跟进

我已经按照上述方式设置了线程池线程计数,并尝试了您的测试代码,结果如下(未在我的整个C:\驱动器上运行,而是在较小的子集上运行):

  • 模式1花费了06.5秒。
  • 模式2花费了15.7秒。
  • 模式3花费了16.4秒。

这符合我的预期;添加一堆线程来执行此操作实际上比单线程更慢,而两种并行方法花费的时间大致相同。


如果有其他人想要调查这个问题,这里提供一些确定性测试代码(因为我们不知道他的目录结构,所以无法重现OP的代码)。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static DirWithSubDirs RootDir;

        private static void Main()
        {
            Console.WriteLine("Loading file system into memory...");
            RootDir = new DirWithSubDirs("Root", 4, 4);
            Console.WriteLine("Done");

            //ThreadPool.SetMinThreads(4000, 16);
            //ThreadPool.SetMaxThreads(4000, 16);

            var w = Stopwatch.StartNew();
            ThisIsARecursiveFunctionInMemory(RootDir);

            Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
            Console.ReadKey();
        }

        public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
        {
            var depth = currentDirectory.Path.Count(t => t == '\\');
            Console.WriteLine(depth + ": " + currentDirectory.Path);

            var children = currentDirectory.SubDirs;

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunctionInMemory(child);
                    }
                    break;

                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                default:
                    break;
            }
        }
    }

    internal class DirWithSubDirs
    {
        public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();

        public String Path { get; private set; }

        public DirWithSubDirs(String path, int width, int depth)
        {
            this.Path = path;

            if (depth > 0)
                for (int i = 0; i < width; ++i)
                    SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1));
        }
    }
}

是的,文件系统已经针对顺序访问进行了优化 - 这样会更快。 - user1023602
请看我的更新,我已经改变了树的加载方式,先加载到内存中,但结果仍然相同。 - Devedse
1
我主要的问题是为什么Parallel.ForEach能够在0.03秒内完成这段代码,而AsParallel().ForAll()需要几天时间(加上每个递归级别创建一个额外的线程)。而且为什么MSDN建议相反? - Devedse
我觉得这很奇怪。我已经添加了完全相同的代码,但是模式2仍然需要几乎无限长的时间。(仍然只能一步一步地进行) - Devedse
@Devedse 我注意到您为此开了一份悬赏,但事实仍然是我无法重现您的结果。问题是:还有其他人可以吗? - Matthew Watson
显示剩余3条评论

4

Parallel.For和.ForEach方法在内部实现上等同于在任务中运行迭代,例如像这样的循环:

Parallel.For(0, N, i => 
{ 
  DoWork(i); 
});

等同于:

var tasks = new List<Task>(N); 
for(int i=0; i<N; i++) 
{ 
tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i)); 
} 
Task.WaitAll(tasks.ToArray());

从每个迭代的角度来看,可能与每个其他迭代并行运行,这是一种可以接受的“心理”模型,但在现实中并不会发生。事实上,并行不一定使用每次迭代一个任务,因为这比必要的开销大得多。Parallel.ForEach尝试使用完成循环所需的最少任务数,以尽快地完成循环。它会启动任务作为线程可用于处理这些任务,并且每个任务参与管理方案(我认为它被称为分块):任务请求完成多个迭代,获得它们,然后处理该工作,然后返回获取更多。块大小根据参与的任务数量、机器负载等因素而变化。
PLINQ的.AsParallel()有不同的实现,但它仍然可以类似地将多个迭代提取到临时存储中,在线程中进行计算(但不是作为任务),并将查询结果放入小缓冲区中。(您会得到基于ParallelQuery的东西,然后进一步的.Whatever()函数绑定到提供并行实现的另一组扩展方法)。
现在我们对这两种机制的工作方式有了一点了解,我将尝试回答您最初的问题:
那么为什么.AsParallel()比Parallel.ForEach慢呢?原因在于下面。任务(或其等效实现)不会阻塞类似于I/O的调用。它们“等待”并释放CPU以执行其他操作。但是(引用C# nutshell book):“PLINQ无法执行I/O绑定工作而不阻塞线程”。这些调用是同步的。它们的编写意图是,如果(仅当)您正在执行诸如下载Web页面之类的任务而不占用CPU时间,则增加并行度。
您的函数调用与I/O绑定调用完全类似的原因是:其中一个线程(称为T)会阻塞并且在其所有子线程完成之前不做任何事情,这可能是一个缓慢的过程。T本身在等待子进程解除阻塞时不是CPU密集型,它除了等待什么也不做。因此,它与典型的I/O绑定函数调用相同。

让我补充一下,如果你真的在编写磁盘树解析器,我完全同意Hans Passant在他的帖子中的观点。虽然从CPU的角度来看,它可能看起来是一个I/O绑定的过程,但磁盘一次只能处理一个请求,而你将会同时发送数十个读取请求。如果你正在做类似下载网页的事情,那么每个网页都将由不同的服务器提供服务,这还好。但你实际上正在对你的磁盘进行拒绝服务攻击。 - Dean
嗨,迪恩,你能看一下我在汉斯的回答下留下的评论吗?简而言之,我想知道如果保持得很小,是否仍然可以从某种程度的并行性中受益,而不会使存储遭受拒绝服务攻击。 - Marc.2377

0
基于AsParallel如何工作的接受答案.AsParallel.ForAll()在调用.ForAll()之前会将其转换回IEnumerable,因此它会创建1个新线程+ N个递归调用(每个调用都会生成一个新线程)。

我觉得奇怪的是Parallel.ForEach并没有这样做,这使其快了很多。(使用AsParallel.ForAll每个节点需要1秒钟,因此对于100000个节点,完成需要超过一天),而与之相比,Parallel.ForEach只需0.03秒。 - Devedse
1
我想这是因为Parallel.ForEach具有完全的迭代控制。也许它只是写得更好? - user1023602

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接