Parallel.For与常规线程的区别

9

我试图理解为什么在以下情况下Parallel.For能够优于许多线程:考虑一批可以并行处理的作业。在处理这些作业时,可能会添加新的工作,需要处理这些新工作。使用Parallel.For的解决方案如下:

var jobs = new List<Job> { firstJob };
int startIdx = 0, endIdx = jobs.Count;
while (startIdx < endIdx) {
  Parallel.For(startIdx, endIdx, i => WorkJob(jobs[i]));
  startIdx = endIdx; endIdx = jobs.Count;
}

这意味着Parallel.For在多次同步时需要同步。考虑一个广度优先图算法; 同步次数将非常大,浪费时间,不是吗?尝试使用老式的线程方法:
var queue = new ConcurrentQueue<Job> { firstJob };
var threads = new List<Thread>();
var waitHandle = new AutoResetEvent(false);
int numBusy = 0;
for (int i = 0; i < maxThreads; i++) 
  threads.Add(new Thread(new ThreadStart(delegate {
    while (!queue.IsEmpty || numBusy > 0) {
      if (queue.IsEmpty)
        // numbusy > 0 implies more data may arrive
        waitHandle.WaitOne();

      Job job;
      if (queue.TryDequeue(out job)) {
        Interlocked.Increment(ref numBusy);
        WorkJob(job); // WorkJob does a waitHandle.Set() when more work was found
        Interlocked.Decrement(ref numBusy);
      }
    }
    // others are possibly waiting for us to enable more work which won't happen
    waitHandle.Set(); 
})));
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
Parallel.For 的代码当然更加简洁,而且我无法理解的是,它甚至更快!任务调度程序真的那么好吗?同步被消除了,没有繁忙等待,然而线程方法始终较慢(对我来说)。发生了什么?线程方法能否变得更快?
编辑:感谢所有答案,我希望我可以选择多个答案。我选择采用一个也展示了实际可能改进的答案。

1
如果已经有更干净、更快的解决方案,为什么还要试图让它变得更快呢? - iMortalitySX
因为有一个明显的缺陷可以被消除,我认为。 - Frank Razenberg
2
你应该意识到这两段代码完全不同,简直是天壤之别。一方面,你有一个并行循环遍历静态、已知数量的作业列表(在startIdx和endIdx之间),而另一方面,你有一组线程竞速队列和可等待事件,其中夹杂着一些交错操作。换句话说,线程代码就是一个"缓存失效的典范"(来自http://www.aristeia.com/TalkNotes/ACCU2011_CPUCaches.pdf)。 - Remus Rusanu
3个回答

14

这两个代码示例并不完全相同。

Parallel.ForEach()会使用有限数量的线程并重复使用它们。第二个示例需要创建许多线程,所以启动时间已经比较慢。

maxThreads的值是多少?在Parallel.ForEach()中非常关键,它是动态的。

任务调度程序就是那么好吗?

任务调度程序还是很不错的。TPL使用工作窃取和其他自适应技术。您很难做得更好。


线程示例还会重复使用它创建的线程。它只启动有限数量的线程,而不是为每个作业启动一个线程,如果您是这个意思的话。 - Frank Razenberg
抢我话了,线程池的使用与否。https://dev59.com/nnVC5IYBdhLWcg3woStW - Justin Harvey
@Henk:我想我会把它设置在2到10之间。 - Frank Razenberg

4
Parallel.For并不会将项目分解为单个的工作单位。它根据计划使用的线程数和要执行的迭代次数提前分解所有工作。然后每个线程同步处理该批次(可能使用工作窃取或在接近结尾时保存一些额外的项目以进行负载平衡)。通过使用这种方法,工作线程实际上几乎永远不会互相等待,而使用重度同步的线程则因在每个迭代之前/之后使用重度同步而不断地等待对方。
此外,由于它使用线程池线程,它需要的许多线程可能已经创建,这是其另一个优点。
至于同步,Parallel.For的整个重点在于所有迭代都可以并行完成,因此几乎不需要进行同步(至少在他们的代码中)。
然后当然还有线程数的问题。线程池具有许多非常好的算法和启发式方法,可以帮助确定需要多少线程(在那个瞬间),基于当前硬件、来自其他应用程序的负载等。你可能使用了太多或太少的线程。
此外,由于在开始之前不知道你拥有的项目数量,我建议使用Parallel.ForEach而不是几个Parallel.For循环。它专门设计用于你所处的情况,因此它的启发式方法将更好地适用。(它还可以使代码更清晰。)
BlockingCollection<Job> queue = new BlockingCollection<Job>();

//add jobs to queue, possibly in another thread
//call queue.CompleteAdding() when there are no more jobs to run

Parallel.ForEach(queue.GetConsumingEnumerable(),
    job => job.DoWork());

实际上,这种方法似乎是不可能的,因为你不知道何时调用 queue.CompleteAdding()。只有在队列既为空又没有人在处理更多项时才能这样做。 - Frank Razenberg
@FrankRazenberg 不需要等待它为空或没有更多的项目正在被处理,只需在没有更多项目添加时调用“CompleteAdding”。 “BlockingCollection”已经会自动处理这个问题。 “CompleteAdding”只是意味着枚举器不会再向其内部集合添加任何项目,因此当它最终吐出最后一个项目时,它应该会中断,而不是阻塞并等待更多项目。 - Servy
但是你怎么知道什么时候/在哪里调用CompleteAdding()呢?它只能被调用一次,对吧? - Frank Razenberg
@FrankRazenberg 当您没有更多要添加的项目时,将调用CompleteAdding。看起来您处理作业的方式是添加更多作业;如果是这种情况,我建议尽可能将其重构为生产者和消费者。有一个单独的线程/任务只创建要处理的“Job”项并将它们添加到阻塞集合中,然后我展示的代码可以为每个作业执行实际处理。 - Servy
只有当所有作业都处理完毕后,我们才能知道是否还有更多的工作要做(即我的线程生成方法中的 while 子句)。我想这是不可能的 :( - Frank Razenberg

2
你正在创建许多新线程,而Parallel.For正在使用Threadpool。如果你使用C#线程池,性能会更好,但实际上没有必要这样做。
我建议不要自己开发解决方案;如果有特殊情况需要定制,请使用TPL并进行自定义。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接