考虑以下代码:
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
DoSomething(someString);
});
所有1000个线程是否几乎同时生成?考虑以下代码:
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
DoSomething(someString);
});
所有1000个线程是否几乎同时生成?很好的问题。在您的示例中,即使在四核处理器上并行化水平也很低,但是通过等待,可以实现相当高的并行化水平。
// Max concurrency: 5
[Test]
public void Memory_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
现在看看当添加一个等待操作来模拟HTTP请求时会发生什么。
// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
我目前还没有进行任何更改,但并发/并行级别已经大幅跳升。可以使用 ParallelOptions.MaxDegreeOfParallelism
来增加并发的限制。
// Max concurrency: 43
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
// Max concurrency: 391
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(100000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
我建议设置ParallelOptions.MaxDegreeOfParallelism
。它不一定会增加正在使用的线程数,但它会确保你只启动合理数量的线程,这似乎是你关心的问题。
最后回答你的问题,不,你不会让所有线程同时启动。如果你想完美地并行调用(例如测试竞态条件),请使用 Parallel.Invoke。
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
ConcurrentBag<string> monitor = new ConcurrentBag<string>();
ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(DateTime.UtcNow.Ticks.ToString());
monitor.TryTake(out string result);
monitorOut.Add(result);
});
var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
根据处理器/核心数量,它会计算出最优线程数。它们不会全部同时启动。
请参考Does Parallel.For use one Task per iteration?,了解使用的“心理模型”的想法。不过作者确实指出:“最重要的是要记住,具体实现细节可能随时更改。”
Parallel.ForEach 会限制活动线程的数量吗?
如果您没有使用正数的 MaxDegreeOfParallelism
配置 Parallel.ForEach
,那么答案是否定的。在默认配置下,并假设具有足够大的大小的 source
序列,Parallel.ForEach
将使用立即可用于 ThreadPool
中的所有线程,并将不断请求更多线程。仅靠自身,Parallel.ForEach
对线程数量没有任何限制。它只受关联的 TaskScheduler
的能力限制。
ParallelOptions.MaxDegreeOfParallelism
是 -1
,这意味着无限并行。ParallelOptions.TaskScheduler
是 ThreadPoolTaskScheduler
,更为人所知的是 TaskScheduler.Default
。如果你想了解未配置的Parallel.ForEach
的行为,你必须知道ThreadPool
的行为。这很简单,可以用一段话来描述。 ThreadPool
立即满足所有工作请求,通过启动新线程,最多达到默认值Environment.ProcessorCount
的软限制。当达到此限制时,进一步的请求将被排队,每秒创建一个新线程以满足需求¹。在我的机器上,线程数量还有一个硬限制,是32,767个线程。可以使用ThreadPool.SetMinThreads
方法配置软限制。如果没有排队的工作,则ThreadPool
也会以大约相同的速率(每秒1个)退役线程。
Parallel.ForEach
如何使用ThreadPool
中的所有可用线程。可用线程数通过ThreadPool.SetMinThreads
进行预先配置,然后Parallel.ForEach
开始运行并占用所有线程:ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 10);
HashSet<Thread> threads = new();
int concurrency = 0;
int maxConcurrency = 0;
Parallel.ForEach(Enumerable.Range(1, 1500), n =>
{
lock (threads) maxConcurrency = Math.Max(maxConcurrency, ++concurrency);
lock (threads) threads.Add(Thread.CurrentThread);
// Simulate a CPU-bound operation that takes 200 msec
Stopwatch stopwatch = Stopwatch.StartNew();
while (stopwatch.ElapsedMilliseconds < 200) { }
lock (threads) --concurrency;
});
Console.WriteLine($"Number of unique threads: {threads.Count}");
Console.WriteLine($"Maximum concurrency: {maxConcurrency}");
输出(等待约5秒后):
Number of unique threads: 102
Maximum concurrency: 102
在线演示。
completionPortThreads
的数量对于这个测试而言是不相关的。 Parallel.ForEach
使用被指定为“workerThreads
”的线程。这 102 个线程如下:
ThreadPool
线程。ThreadPool
线程。¹ ThreadPool
的注入速率未记录。截至 .NET 7,它是每秒钟一个线程,但这可能会在未来的 .NET 版本中发生改变。
Parallel.ForEach
会考虑到其他进程在同一台机器上对可用核心所施加的负载吗?如果是的话,您是否已经通过实验进行了验证?我对这种说法非常怀疑。 - Theodor Zoulias