使用Scala actors计算阶乘

3
如何使用Scala actors计算阶乘?
相较于其他方式,这种方法是否更加高效?
def factorial(n: Int): BigInt = (BigInt(1) to BigInt(n)).par.product

许多感谢。

检查计算阶乘的并行算法...也许问题的关键是对于哪些n,通过演员实现的并行算法比上述朴素方法更有效... - elm
1
使用并行前缀计算。请参阅维基百科:http://en.wikipedia.org/wiki/Prefix_sum#Parallel_algorithm - stefan.schwetschke
非常感谢@stefan.schwetschke,正在检查前缀和... - elm
1个回答

4

问题

您需要将输入分解成部分乘积。然后可以并行计算这些部分乘积。最后,将这些部分乘积相乘得到最终乘积。

这可以简化为更广泛的问题类别:所谓的并行前缀计算。您可以在Wikipedia上了解相关信息。

简短版:当您使用可结合操作_ * _计算a*b*c*d时,您可以构造计算a*(b*(c*d))(a*b)*(c*d)。通过第二种方法,您可以并行计算a*bc*d,然后从这些部分结果计算出最终结果。当然,如果您有更多的输入值,可以递归地执行此操作。

解决方案

免责声明

这听起来有点像一个作业。因此,我将提供一个具有两个特点的解决方案:

  1. 它包含一个小错误
  2. 它展示了如何通用地解决并行前缀问题,而不是直接解决该问题
您可以看到解决方案应该如何构建,但没有人可以用它来作弊她的作业。
详细解决方案
首先我需要一些导入
导入 akka.event.Logging import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration import akka.actor._
然后我创建了一些帮助类用于演员之间的通信
case class Calculate[T](values : Seq[T], segment : Int, parallelLimit : Int, fn : (T,T) => T)

trait CalculateResponse
case class CalculationResult[T](result : T, index : Int) extends CalculateResponse
case object Busy extends CalculateResponse

除了告诉接收者你很忙,演员也可以使用隐藏功能或实现自己的队列来获取部分结果。但在这种情况下,我认为发送方应该决定允许多少并行计算。

现在我创建演员:

class ParallelPrefixActor[T] extends Actor {
  val log = Logging(context.system, this)
  val subCalculation = Props(classOf[ParallelPrefixActor[BigInt]])
  val fanOut = 2
  def receive = waitForCalculation

  def waitForCalculation : Actor.Receive = {
    case c : Calculate[T] =>
      log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}")
      if (c.values.length < c.parallelLimit) {
        log.debug("Calculating result direct")
        val result = c.values.reduceLeft(c.fn)
        sender ! CalculationResult(result, c.index)
      }else{
        val groupSize: Int = Math.max(1, (c.values.length / fanOut) + Math.min(c.values.length % fanOut, 1))
        log.debug(s"Splitting calculation for ${c.values.length} values up to ${fanOut} children, ${groupSize} elements each, limit ${c.parallelLimit}")
        def segments=c.values.grouped(groupSize)
        log.debug("Starting children")
        segments.zipWithIndex.foreach{case (values, index) =>
          context.actorOf(subCalculation) ! c.copy(values = values, index = index)
        }
        val partialResults: Vector[T] = segments.map(_.head).to[Vector]
        log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})")
        context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true)
      }
  }
  def waitForResults(outstandingResults : Int, partialResults : Vector[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = {
    case c : Calculate[_] => sender ! Busy
    case r : CalculationResult[T] =>
      log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}")
      val updatedResults = partialResults.updated(r.index, r.result)
      log.debug("Killing sub-worker")
      sender ! PoisonPill
      if (outstandingResults==1) {
        log.debug("Calculating result from partial results")
        val result = updatedResults.reduceLeft(originalRequest.fn)
        originalSender ! CalculationResult(result, originalRequest.index)
        context.become(waitForCalculation, discardOld = true)
      }else{
        log.debug(s"Still waiting for ${outstandingResults-1} results")
        // For fanOut > 2 one could here already combine consecutive partial results
        context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true)
      }
  }
}

优化

使用并行前缀计算并不是最优的选择。计算较大数的参与者将比计算较小数的参与者做更多的工作(例如,当计算 1 * ... * 100 时,计算1 * ... * 10比计算90 * ... * 100更快)。因此,将数字混合在一起,使大数和小数混合可能是一个好主意。这在本例中有效,因为我们使用了交换操作。一般情况下,并行前缀计算只需要一个可结合的操作即可。

性能

理论上

对于少量数据而言,使用演员解决方案的性能不如“朴素”解决方案(使用并行集合)。当您进行复杂计算或在专用硬件(例如图形卡或FPGA)或多台机器上分发计算时,演员解决方案将会表现出色。使用演员,您可以控制谁执行哪些计算,甚至可以重新启动“挂起的计算”。这可以带来巨大的加速

在单台计算机上,当你有非统一内存架构时,演员模式可能会有所帮助。然后,您可以以某种方式组织演员,将内存钉住到某个处理器上。

一些测量

我使用IntelliJ IDEA中的Scala工作表进行了一些实际性能测量。

首先,我设置了演员系统:

// Setup the actor system
val system = ActorSystem("root")
// Start one calculation actor
val calculationStart = Props(classOf[ParallelPrefixActor[BigInt]])


val calcolon = system.actorOf(calculationStart, "Calcolon-BigInt")

val inbox = Inbox.create(system)

然后我定义了一个辅助方法来测量时间:
// Helper function to measure time
def time[A] (id : String)(f: => A) = {
  val start = System.nanoTime()
  val result = f
  val stop = System.nanoTime()
  println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""")
  result
}

然后我进行了一些性能测量:

// Test code
val limit = 10000
def testRange = (1 to limit).map(BigInt(_))

time("par product")(testRange.par.product)
val timeOut = FiniteDuration(240, TimeUnit.SECONDS)
inbox.send(calcolon, Calculate[BigInt]((1 to limit).map(BigInt(_)), 0, 10, _ * _))
time("actor product")(inbox.receive(timeOut))

time("par sum")(testRange.par.sum)
inbox.send(calcolon, Calculate[BigInt](testRange, 0, 5, _ + _))
time("actor sum")(inbox.receive(timeOut))

我得到了以下结果

> "par product"所需时间:134.38289毫秒
  res0: scala.math.BigInt = 284625968091705451890641321211986889014805140170279923
  079417999427441134000376444377299078675778477581588406214231752883004233994015
  351873905242116138271617481982419982759241828925978789812425312059465996259867
  065601615720360323979263287367170557419759620994797203461536981198970926112775
  004841988454104755446424421365733030767036288258035489674611170973695786036701
  910715127305872810411586405612811653853259684258259955846881464304255898366493
  170592517172042765974074461334000541940524623034368691540594040662278282483715
  120383221786446271838229238996389928272218797024593876938030946273322925705554
  596900278752822425443480211275590191694254290289169072190970836905398737474524
  833728995218023632827412170402680867692104515558405671725553720158521328290342
  799898184493136...
"actor product"所需时间:1310.217247毫秒 res2: Any = 计算结果(28462596809170545189064132121198688901480514017027 992307941799942744113400037644437729907867577847758158840621423175288300423399 401535187390524211613827161748198241998275924182892597878981242531205946599625 986706560161572036032397926328736717055741975962099479720346153698119897092611 277500484198845410475544642442136573303076703628825803548967461117097369578603 670191071512730587281041158640561281165385325968425825995584688146430425589836 649317059251717204276597407446133400054194052462303436869154059404066227828248 371512038322178644627183822923899638992827221879702459387693803094627332292570 555459690027875282242544348021127559019169425429028916907219097083690539873747 452483372899521802363282741217040268086769210451555840567172555372015852132829 034279989818449...
> "par sum"所需时间:6.488620999999999毫秒 res3: scala.math.BigInt = 50005000
> "actor sum"所需时间:657.752832毫秒 res5: Any = 计算结果(50005000,0)

您可以很容易地看到,使用Actor版本要比使用并行集合慢得多。


非常感谢 @stefan.schwetschke 提供这个令人印象深刻的答案!我正在学习和借鉴它... - elm
没问题。看一下下面的代码片段。这是一个修复了一些错误并添加了一些性能改进的版本。对于大型阶乘,Actor版本有时比使用并行集合的版本运行得更快。https://gist.github.com/geggo98/8913772 - stefan.schwetschke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接