Scala并行集合的并行度

39

在Scala的并行集合中,是否有类似于LINQ的 withDegreeOfParallelism 的等价功能,可以设置运行查询的线程数?我想要并行地运行一个需要固定数量线程的操作。

2个回答

60

使用最新的trunk版本,使用JVM 1.6或更高版本,请使用:

collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(parlevel: Int)
这可能会在未来发生变化。计划在未来的版本中采取更统一的方式来配置所有Scala任务并行API。
请注意,虽然这将确定查询使用的处理器数量,但这可能不是运行查询所涉及的实际线程数。由于并行集合支持嵌套并行性,如果检测到必要情况,实际的线程池实现可能会分配更多线程来运行查询。
编辑:
从Scala 2.10开始,设置并行级别的首选方法是通过将tasksupport字段设置为新的TaskSupport对象,例如以下示例:
scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)

scala> pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a

scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

在使用fork join池实例化ForkJoinTaskSupport对象时,必须将fork join池的并行度级别设置为所需值(例如示例中的2)。


6

无论使用哪个JVM版本,使用Scala 2.9+(引入了并行集合)您也可以使用grouped(Int)par函数的组合来在小块上执行并行任务,例如:

scala> val c = 1 to 5
c: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)

scala> c.grouped(2).seq.flatMap(_.par.map(_ * 2)).toList
res11: List[Int] = List(2, 4, 6, 8, 10)

grouped(2)函数将集合按长度为2或更小的块进行分组,seq函数确保这些块不是并行的(在此示例中无用),然后在使用par创建的小型并行块上执行_ * 2函数,从而确保最多只有2个线程并行执行。

但是,这种方法可能比设置工作池参数略微低效,我对此不确定。


我对这是否真的会给你带来任何好处持怀疑态度。我需要看到证明它的基准数字。 - Seth Tisue

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接