Parallel.ForEach比普通的foreach慢

3

我正在尝试使用Parallel.ForEach在C#控制台应用程序中进行操作,但似乎无法得到正确结果。我创建了一个随机数数组,并且使用了顺序foreach和Parallel.ForEach来查找数组中的最大值。在大约相同的C++代码中,当数组中有3M个值时,使用多个线程开始出现折衷方案。但是,在有100M个值的情况下,Parallel.ForEach比顺序foreach慢两倍。我做错了什么?

class Program
{
    static void Main(string[] args)
    {
        dostuff();

    }

    static void dostuff() {
        Console.WriteLine("How large do you want the array to be?");
        int size = int.Parse(Console.ReadLine());

        int[] arr = new int[size];
        Random rand = new Random();
        for (int i = 0; i < size; i++)
        {
            arr[i] = rand.Next(0, int.MaxValue);
        }

        var watchSeq = System.Diagnostics.Stopwatch.StartNew();
        var largestSeq = FindLargestSequentially(arr);
        watchSeq.Stop();
        var elapsedSeq = watchSeq.ElapsedMilliseconds;
        Console.WriteLine("Finished sequential in: " + elapsedSeq + "ms. Largest = " + largestSeq);

        var watchPar = System.Diagnostics.Stopwatch.StartNew();
        var largestPar = FindLargestParallel(arr);
        watchPar.Stop();
        var elapsedPar = watchPar.ElapsedMilliseconds;
        Console.WriteLine("Finished parallel in: " + elapsedPar + "ms Largest = " + largestPar);

        dostuff();
    }

    static int FindLargestSequentially(int[] arr) {
        int largest = arr[0];
        foreach (int i in arr) {
            if (largest < i) {
                largest = i;
            }
        }
        return largest;
    }

    static int FindLargestParallel(int[] arr) {
        int largest = arr[0];
        Parallel.ForEach<int, int>(arr, () => 0, (i, loop, subtotal) =>
        {
            if (i > subtotal)
                subtotal = i;
            return subtotal;
        },
        (finalResult) => {
            Console.WriteLine("Thread finished with result: " + finalResult);
            if (largest < finalResult) largest = finalResult;
        }
        );
        return largest;
    }
}

我将并行执行放入了一个5次循环中,对于5亿次的执行时间变化非常大。可能是100毫秒或10秒。 - Christoph
1
你在以调试模式运行代码吗?依我的经验,当 VS 调试器连接时,并行方法的运行速度会变得非常缓慢。尝试使用 Release 模式进行构建,并从生成的 exe 文件启动程序,而不是从 VS 启动。 - Mateusz Krzaczek
1
每个Parallel.ForEach都会启动自己的任务,因此它会降低性能。相反,您应该考虑使用Range Partitioner来分块处理工作。建议使用2 * Environment.ProcessorCount作为块大小。请参见https://msdn.microsoft.com/en-us/library/system.collections.concurrent.partitioner(v=vs.110).aspx。 - Rick Davin
在500M时,我会遇到内存不足的异常,但是使用100M运行时,顺序循环需要65毫秒,而并行循环的时间则从3900毫秒(使用大量线程)到280毫秒(使用10-15个线程)不等。在Visual Studio之外的发布模式下。 - Kristoffer Berge
我同意@DeX3r的观点,这可能与诸如调试会话之类的环境因素有关。例如,运行此LINQPad脚本显示并行比顺序略快。此LINQPad脚本 - StriplingWarrior
5个回答

7

拥有非常小的代表团体可能会对性能产生影响。

我们可以通过分区来实现更好的性能。在这种情况下,代表团体的主体承担大量数据工作。

static int FindLargestParallelRange(int[] arr)
{
    object locker = new object();
    int largest = arr[0];
    Parallel.ForEach(Partitioner.Create(0, arr.Length), () => arr[0], (range, loop, subtotal) =>
    {
        for (int i = range.Item1; i < range.Item2; i++)
            if (arr[i] > subtotal)
                subtotal = arr[i];
        return subtotal;
    },
    (finalResult) =>
    {
        lock (locker)
            if (largest < finalResult)
                largest = finalResult;
    });
    return largest;
}

请注意同步本地Finally委托。还需注意正确初始化localInit:() => arr[0]而不是() => 0

使用PLINQ进行分区:

static int FindLargestPlinqRange(int[] arr)
{
    return Partitioner.Create(0, arr.Length)
        .AsParallel()
        .Select(range =>
        {
            int largest = arr[0];
            for (int i = range.Item1; i < range.Item2; i++)
                if (arr[i] > largest)
                    largest = arr[i];
            return largest;
        })
        .Max();
}

我强烈推荐由Stephen Toub所著的免费电子书《并行编程模式》(Patterns of Parallel Programming)


有趣的是,arr.AsParallel().Max() 似乎甚至比这个分区策略表现更好。请参见我的答案以获取我用于测试的 LINQPad 脚本的链接。 - StriplingWarrior

2
其他回答者已经提到,您试图对每个项目执行的操作非常微不足道,以至于有许多其他因素比您实际进行的工作更重要。这些可能包括:
  • JIT 优化
  • CPU 分支预测
  • I/O(在计时器运行时输出线程结果)
  • 调用委托的成本
  • 任务管理的成本
  • 系统错误地猜测什么线程策略最优
  • 内存 / CPU 缓存
  • 内存压力
  • 环境(调试)
  • 等等。
单次运行每种方法并不是测试的充分方式,因为它使得上述因素中的某些因素在一个迭代中比另一个迭代更加重要。您应该从更强大的基准测试策略开始。
此外,您的实现实际上是非常错误的。 文档 明确表示:
“localFinally”委托会针对每个任务调用一次,以执行每个任务的本地状态的最终操作。该委托可能会在多个任务上同时调用;因此,您必须同步访问任何共享变量。
您尚未同步最终委托,因此您的函数容易出现竞争条件,从而导致产生不正确的结果。
与大多数情况一样,最好的方法是利用比我们聪明的人所做的工作。在我的测试中,以下方法似乎是总体上最快的:
return arr.AsParallel().Max();

1
非常有趣。在 LinqPad 和 VS 中结果是不同的。 - Alexander Petrov

1
并行Foreach循环的运行速度应该较慢,因为所使用的算法不是并行的,需要更多的工作来运行此算法。
在单线程中,要找到最大值,我们可以将第一个数字作为最大值,并将其与数组中的每个其他数字进行比较。如果有一个数字大于我们的第一个数字,我们交换并继续。这样我们只访问了数组中的每个数字一次,总共进行了N次比较。
在上面的并行循环中,该算法会创建开销,因为每个操作都包装在一个带有返回值的函数调用中。因此除了进行比较之外,还需要运行添加和删除这些调用到调用堆栈的开销。此外,由于每个调用都依赖于前一个函数调用的值,它需要按顺序运行。
在下面的并行For循环中,数组被分成由变量threadNumber确定的明确数量的线程。这将将函数调用的开销限制在较低的数量。
请注意,对于较小的值,并行循环的执行速度较慢。但是,对于1亿个值,时间减少了。
static int FindLargestParallel(int[] arr)
{
    var answers = new ConcurrentBag<int>();
    int threadNumber = 4;

    int partitionSize = arr.Length/threadNumber;
    Parallel.For(0, /* starting number */
        threadNumber+1, /* Adding 1 to threadNumber in case array.Length not evenly divisible by threadNumber */
        i =>
        {
            if (i*partitionSize < arr.Length) /* check in case # in array is divisible by # threads */
            {
                var max = arr[i*partitionSize];
                for (var x = i*partitionSize; 
                    x < (i + 1)*partitionSize && x < arr.Length;
                    ++x)
                {
                    if (arr[x] > max)
                        max = arr[x];
                }
                answers.Add(max);
            }
        });

    /* note the shortcut in finding max in the bag */    
    return answers.Max(i=>i);
}

0
这里有一些想法:在并行情况下,涉及到线程管理逻辑来确定要使用多少个线程。这个线程管理逻辑可能在主线程上运行。每次一个线程返回新的最大值时,管理逻辑就会启动并确定下一个工作项(即在数组中要处理的下一个数字)。我很确定这需要某种形式的锁定。无论如何,确定下一个项甚至可能比执行比较操作本身更耗费时间。
对我来说,这听起来比一个接一个地处理一个数字的单线程工作量要大得多(开销更大)。在单线程情况下,有许多优化策略可以发挥作用:没有边界检查,CPU可以将数据加载到CPU内部的第一级缓存中等等。不确定这些优化策略中哪些适用于并行情况。
请记住,在典型的桌面机器上,只有2到4个物理CPU核心可用,因此实际在工作的核心数永远不会超过那个范围。因此,如果并行处理的开销比单线程操作的2-4倍还要多,则并行版本将不可避免地更慢,您也正在观察到这一点。
你尝试在32核的机器上运行过吗? ;-)
一个更好的解决方案是确定覆盖整个数组的非重叠范围(起始+停止索引),并让每个并行任务处理一个范围。这样,每个并行任务可以在内部执行紧密的单线程循环,并且只有在整个范围被处理完毕后才返回。您甚至可以根据机器的逻辑核心数量确定接近最优的范围数。我没有尝试过这个方法,但我相信您会看到比单线程情况下更好的效果。

我以前从未看过这个Parallel.ForEach()重载,但它听起来正是你在最后一段中建议的。它将工作分配给一组线程,按顺序运行主体委托,然后使用localFinally委托有效地合并结果。 - StriplingWarrior
1
Paul Tsai实现了我在上一个段落中描述的精确内容。它可能是并行逻辑在内部将数组分成范围,每个线程负责一个范围。然而,在您的实现中,仍意味着对于每个元素,都会有一个调用并行代理方法。在Paul的实现中,他在并行代理方法中运行一个循环,迭代其分配的范围。 - Christoph

0

尝试将集合分成批次并并行运行这些批次,其中批次的数量与您的 CPU 核心数相对应。 我使用以下方法运行了一些方程式 1K、10K 和 1M 次:

  1. 使用 "for" 循环。
  2. 使用 System.Threading.Tasks 库中的 "Parallel.For" 在整个集合上进行循环。
  3. 使用 "Parallel.For" 在 4 个批次上进行循环。
  4. 使用 System.Threading.Tasks 库中的 "Parallel.ForEach" 在整个集合上进行循环。
  5. 使用 "Parallel.ForEach" 在 4 个批次上进行循环。

结果:(以秒为单位测量)

enter image description here

结论:
在处理超过10,000条记录的情况下,使用“Parallel.ForEach”并行处理批次会产生最佳结果。我认为分批处理有帮助,因为它利用了所有CPU核心(例如,在这个例子中有4个核心),同时最小化了与并行化相关的线程开销。

以下是我的代码:

        public void ParallelSpeedTest()
    {
        var rnd = new Random(56);
        int range = 1000000;
        int numberOfCores = 4;
        int batchSize = range / numberOfCores;
        int[] rangeIndexes = Enumerable.Range(0, range).ToArray();
        double[] inputs = rangeIndexes.Select(n => rnd.NextDouble()).ToArray();
        double[] weights = rangeIndexes.Select(n => rnd.NextDouble()).ToArray();
        double[] outputs = new double[rangeIndexes.Length];

        /// Series "for"...
        var startTimeSeries = DateTime.Now;
        for (var i = 0; i < range; i++)
        {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        }
        var durationSeries = DateTime.Now - startTimeSeries;

        /// "Parallel.For"...
        var startTimeParallel = DateTime.Now;
        Parallel.For(0, range, (i) => {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        });
        var durationParallelFor = DateTime.Now - startTimeParallel;

        /// "Parallel.For" in Batches...
        var startTimeParallel2 = DateTime.Now;
        Parallel.For(0, numberOfCores, (c) => {
            var endValue = (c == numberOfCores - 1) ? range : (c + 1) * batchSize;
            var startValue = c * batchSize;
            for (var i = startValue; i < endValue; i++)
            {
                outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
            }
        });
        var durationParallelForBatches = DateTime.Now - startTimeParallel2;

        /// "Parallel.ForEach"...
        var startTimeParallelForEach = DateTime.Now;
        Parallel.ForEach(rangeIndexes, (i) => {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        });
        var durationParallelForEach = DateTime.Now - startTimeParallelForEach;

        /// Parallel.ForEach in Batches...
        List<Tuple<int,int>> ranges = new List<Tuple<int, int>>();
        for (var i = 0; i < numberOfCores; i++)
        {
            int start = i * batchSize;
            int end = (i == numberOfCores - 1) ? range : (i + 1) * batchSize;
            ranges.Add(new Tuple<int,int>(start, end));
        }
        var startTimeParallelBatches = DateTime.Now;
        Parallel.ForEach(ranges, (range) => {
            for(var i = range.Item1; i < range.Item1; i++) {
                outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
            }
        });
        var durationParallelForEachBatches = DateTime.Now - startTimeParallelBatches;

        Debug.Print($"=================================================================");
        Debug.Print($"Given: Set-size: {range}, number-of-batches: {numberOfCores}, batch-size: {batchSize}");
        Debug.Print($".................................................................");
        Debug.Print($"Series For:                       {durationSeries}");
        Debug.Print($"Parallel For:                 {durationParallelFor}");
        Debug.Print($"Parallel For Batches:         {durationParallelForBatches}");
        Debug.Print($"Parallel ForEach:             {durationParallelForEach}");
        Debug.Print($"Parallel ForEach Batches:     {durationParallelForEachBatches}");
        Debug.Print($"");
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接