Parallel.ForEach是否限制活跃线程数?

121

考虑以下代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});
所有1000个线程是否几乎同时生成?
6个回答

167
不,它不会启动1000个线程-是的,它会限制使用的线程数量。Parallel Extensions根据您实际拥有的核心数以及已经繁忙的核心数来使用适当数量的核心。它为每个核心分配工作,然后使用一种称为"工作窃取"的技术让每个线程高效地处理自己的队列,并且只需要在真正需要时进行任何昂贵的跨线程访问。
请查看PFX Team Blog,了解有关如何分配工作和各种其他主题的大量信息。
请注意,在某些情况下,您也可以指定要使用的并行度。

5
今晚我使用了Parallel.ForEach(FilePathArray, path =>...)读取约24,000个文件,每读入一个文件就创建一个新文件。这段代码非常简单。但即使使用了6个线程,也足以使我从7200 RPM的硬盘中读取数据时达到100%的利用率。在几个小时内,我看到Parallel库创建了超过8,000个线程。我尝试使用MaxDegreeOfParallelism进行测试,结果8000多个线程消失了。我已经进行了多次测试,结果都是一样的。 - Jake Drew
可能会为某个退化的'DoSomething'启动1000个线程。(就像我目前正在处理的生产代码问题一样,未设置限制并生成200多个线程,从而弹出SQL连接池...我建议为任何无法轻松推断为显式CPU绑定的工作设置最大DOP。) - user2864740
“Parallel Extensions使用适当数量的核心,基于您实际拥有的核心数以及已经繁忙的核心数。”-- 您的意思是Parallel.ForEach会考虑到其他进程在同一台机器上对可用核心所施加的负载吗?如果是的话,您是否已经通过实验进行了验证?我对这种说法非常怀疑。 - Theodor Zoulias
@TheodorZoulias:老实说,我不记得13年前我验证了什么了,恐怕是这样的。 - Jon Skeet
好的,没问题。我会假设这个答案包含在PFX博客中找到的信息。顺便说一句,我很惊讶这个答案中的链接在13年后仍然没有失效! - Theodor Zoulias

37
在单核机器上,Parallel.ForEach将工作的集合分区(chunk)到多个线程之间,但是它使用的线程数量是基于一种算法计算出来的。该算法考虑并似乎不断监视着分配给ForEach的线程所完成的工作量。因此,如果ForEach的主体部分调用长时间运行的IO绑定/阻塞功能,这将使线程处于等待状态,该算法将会启动更多线程,并在它们之间重新分配集合。如果线程快速完成并且不会阻塞例如IO线程,例如仅计算一些数字,则该算法将升级(或降级)线程数,以达到算法认为最适合吞吐量(每个迭代的平均完成时间)的点。
基本上,在所有不同Parallel库函数背后的线程池中,将计算最优线程数。物理处理器核心数只是方程式的一部分。线程生成的数量与核心数之间没有简单的一对一关系。
我发现有关取消和处理同步线程的文档并不是很有帮助。希望微软可以在MSDN中提供更好的示例。
不要忘记,必须编写能够在多个线程上运行的代码,以及所有常规的线程安全问题,框架并未抽象化该因素...但是。

1
如果ForEach的主体部分调用了长时间阻塞的函数,这将使线程等待,算法将会生成更多的线程。在极端情况下,这意味着可能会创建与ThreadPool允许的数量一样多的线程。 - user2864740
3
没错,就我自己的调试经验而言,对于 IO 操作可能会分配超过 100 个线程。 - FindOutIslamNow

13

很好的问题。在您的示例中,即使在四核处理器上并行化水平也很低,但是通过等待,可以实现相当高的并行化水平。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

现在看看当添加一个等待操作来模拟HTTP请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我目前还没有进行任何更改,但并发/并行级别已经大幅跳升。可以使用 ParallelOptions.MaxDegreeOfParallelism 来增加并发的限制。

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我建议设置ParallelOptions.MaxDegreeOfParallelism。它不一定会增加正在使用的线程数,但它会确保你只启动合理数量的线程,这似乎是你关心的问题。

最后回答你的问题,不,你不会让所有线程同时启动。如果你想完美地并行调用(例如测试竞态条件),请使用 Parallel.Invoke。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}

4

根据处理器/核心数量,它会计算出最优线程数。它们不会全部同时启动。


4

0

Parallel.ForEach 会限制活动线程的数量吗?

如果您没有使用正数的 MaxDegreeOfParallelism 配置 Parallel.ForEach,那么答案是否定的。在默认配置下,并假设具有足够大的大小的 source 序列,Parallel.ForEach 将使用立即可用于 ThreadPool 中的所有线程,并将不断请求更多线程。仅靠自身,Parallel.ForEach 对线程数量没有任何限制。它只受关联的 TaskScheduler 的能力限制。

如果你想了解未配置的Parallel.ForEach的行为,你必须知道ThreadPool的行为。这很简单,可以用一段话来描述。 ThreadPool立即满足所有工作请求,通过启动新线程,最多达到默认值Environment.ProcessorCount的软限制。当达到此限制时,进一步的请求将被排队,每秒创建一个新线程以满足需求¹。在我的机器上,线程数量还有一个硬限制,是32,767个线程。可以使用ThreadPool.SetMinThreads方法配置软限制。如果没有排队的工作,则ThreadPool也会以大约相同的速率(每秒1个)退役线程。

以下是一个实验性演示,展示了Parallel.ForEach如何使用ThreadPool中的所有可用线程。可用线程数通过ThreadPool.SetMinThreads进行预先配置,然后Parallel.ForEach开始运行并占用所有线程:
ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 10);
HashSet<Thread> threads = new();
int concurrency = 0;
int maxConcurrency = 0;
Parallel.ForEach(Enumerable.Range(1, 1500), n =>
{
    lock (threads) maxConcurrency = Math.Max(maxConcurrency, ++concurrency);
    lock (threads) threads.Add(Thread.CurrentThread);
    // Simulate a CPU-bound operation that takes 200 msec
    Stopwatch stopwatch = Stopwatch.StartNew();
    while (stopwatch.ElapsedMilliseconds < 200) { }
    lock (threads) --concurrency;
});
Console.WriteLine($"Number of unique threads: {threads.Count}");
Console.WriteLine($"Maximum concurrency: {maxConcurrency}");

输出(等待约5秒后):

Number of unique threads: 102
Maximum concurrency: 102

在线演示

completionPortThreads 的数量对于这个测试而言是不相关的。 Parallel.ForEach 使用被指定为“workerThreads”的线程。这 102 个线程如下:

  • 立即按需创建的 100 个 ThreadPool 线程。
  • 在 1 秒延迟后注入的另一个 ThreadPool 线程。
  • 控制台应用程序的主线程。

¹ ThreadPool 的注入速率未记录。截至 .NET 7,它是每秒钟一个线程,但这可能会在未来的 .NET 版本中发生改变。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接