如何并行执行多个任务?

6

我参加了课程《并行编程》,其中介绍了并行接口:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskA
  val tb = task {taskB}
  (ta, tb.join())
}

以下是错误的内容:
def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskB
  val tb = task {taskB}.join()
  (ta, tb)
}

请查看以下链接了解更多接口信息:https://gist.github.com/ChenZhongPu/fe389d30626626294306264a148bd2aa

它还向我们展示了执行四项任务的正确方式:

def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
}

我的问题是:如果我不知道任务数量(一个任务列表),如何正确调用每个任务的 join 方法?

tasks.map(_.join()) // wrong

编辑

类似的讨论也在本周模块的讨论区:并行编程中出现。


注意:map函数返回一个新的集合,其中每个元素都由该函数转换。也许您需要进行转换,而不创建新的集合? - jwvh
@jwvh 怎么做? - chenzhongpu
既然你不打算返回任何东西,那就直接使用 foreach 怎么样? - sebszyller
@sebszyller 使用foreach也会按顺序执行这些任务,而不是并行执行。 - chenzhongpu
@chenzhongpu 我的意思是将它们连接起来,而不是实际执行。 - sebszyller
当您调用val t = task { ... }时,关键字val启动任务执行。Join仅用于收集结果 - 因此您的代码是正确的。 - George
3个回答

3

使用并行编程课程中的框架

你可以像这样实现该方法:

def parallel[A](tasks: (() => A)*): Seq[A] = {
  if (tasks.isEmpty) Nil
  else {
    val pendingTasks = tasks.tail.map(t => task { t() })
    tasks.head() +: pendingTasks.map(_.join())
  }
}

请注意,你无法拥有可变数量的按名称参数 - 尽管这可以改变

然后像这样使用它:

object ParallelUsage {
  def main(args: Array[String]) {
    val start = System.currentTimeMillis()

    // Use a list of tasks:
    val tasks = List(longTask _, longTask _, longTask _, longTask _)
    val results = parallel(tasks: _*)
    println(results)

    // or pass any number of individual tasks directly:
    println(parallel(longTask, longTask, longTask))
    println(parallel(longTask, longTask))
    println(parallel(longTask))
    println(parallel())

    println(s"Done in ${ System.currentTimeMillis() - start } ms")
  }

  def longTask() = {
    println("starting longTask execution")
    Thread.sleep(1000)
    42 + Math.random
  }
}

使用Scala并行集合

没有比这更简单的了:

val tasks = Vector(longTask _, longTask _, longTask _)
val results = tasks.par.map(_()).seq

1

寻找一种实用的方式来构建parallel(),我发现它可以从Future中构建。这种范式对于使用现代Javascript Promises的任何人都会感到熟悉:

import scala.concurrent.{Await,Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

def parallel[A, B](taskA: =>A, taskB: =>B): (A,B) = {
  val fB:Future[B] = Future { taskB }
  val a:A = taskA
  val b:B = Await.result(fB, Duration.Inf)
  (a,b)
}

这将把任务B分离到自己的线程中,并在主线程中执行任务A。我们执行taskA并等待fB完成,如果需要,就永远等下去。请注意,我还没有测试过这种设置的异常情况,它可能会停滞或表现不良。

0
受到Future.sequence的启发并有点作弊。你需要一个既是Task实现又是Monad的实现来使这个设计工作。
  /** Transforms a `TraversableOnce[Task[A]]` into a `Task[TraversableOnce[A]]`.
   *  Useful for reducing many `Task`s into a single `Task`.
   */
  def parallel[
    A,
    M[X] <: TraversableOnce[X]
  ](in: M[Task[A]])(
    implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]],
    executor: ExecutionContext
  ): Task[M[A]] = {
    in.foldLeft(Task.point(cbf(in))) {
      (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
    }.map(_.result())(executor)
  }

这可以在大多数Scala集合中并行执行操作,唯一的条件是Task定义了mapflatMap,无论实现方式如何,因为您可以使用Scala库内部的implicit builder构造来抽象特定的集合类型。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接