为了获得与您相似的计时,我需要使用本地ExecutionContext,像这样(
从这里):
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
在此之后,我通过将列表拆分并将它们分配给vals来获得更好的性能(基于记住for循环中的futures是按顺序处理的,除非它们在for循环之前被分配给vals)。由于列表的关联性质,我可以使用同一函数再次调用它们。我修改了
timeFuture
函数以接受描述并打印添加结果:
def timeFuture(desc: String, fn: => Future[_]) = {
val t0 = System.currentTimeMillis
val res = Await.result(fn, Inf)
println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}
我刚接触Scala,还在研究如何在最后一步重用同一个函数(我认为这应该是可能的),所以我作弊了,创建了一个辅助函数:
def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))
然后我可以做以下事情:
timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )
val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)
带有 这里 的最后一个压缩文件。
我认为这是并行化工作并重新组合结果。当我运行它们时,我得到:
reduce = 210 in 2.111s
split futures = 210 in 1.201s
我使用了硬编码的一对takes,但我的想法是整个列表分割可以放入一个函数中,并实际重用分配给右侧和左侧分支的关联函数(由于余数允许稍微不平衡的树),最终达到目的。
当我对
slowInt()
和
slowAdd()
函数进行随机化时:
def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }
我仍然认为“分裂未来”比“减少未来”更快完成。启动时似乎存在一些开销,这会影响第一个
timeFuture
调用。以下是在启动惩罚超过“分裂未来”的情况下运行它们的几个示例:
split futures = 210 in 2.299s
reduce = 210 in 4.7s
split futures = 210 in 2.594s
reduce = 210 in 3.5s
split futures = 210 in 2.399s
reduce = 210 in 4.401s
在比我的笔记本电脑更快的计算机上,并且使用与问题中相同的ExecutionContext,我没有看到如此大的差异(在slow*函数中没有随机化):
split futures = 210 in 2.196s
reduce = 210 in 2.5s
在这里,看起来“分裂未来”只是领先一点。
最后一次尝试。这是一个函数(也称为丑陋的东西),它扩展了我上面提到的想法:
def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
(x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
}
def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
(right, left) match {
case(r::Nil, Nil) => r
case(Nil, l::Nil) => l
case(r::Nil, l::Nil) => applyAssocFn( r, l )
case(r::Nil, l::ls) => {
val (l_right, l_left) = ls.splitAt(ls.size/2)
val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
applyAssocFn( r, lret )
}
case(r::rs, l::Nil) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
applyAssocFn( rret, l )
}
case (r::rs, l::ls) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val (l_right, l_left) = ls.splitAt(ls.size/2)
val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
val heads = applyAssocFn(r, l)
applyAssocFn( heads, tails )
}
}
}
val( right, left ) = f.splitAt(f.size/2)
divideAndConquer( right, left )
}
将列表进行非尾递归拆分并尽快将 futures 分配给值(以便启动它们)会使 Scala 失去所有美感。
当我进行以下测试时:
timeFuture( "splitList", splitList( futures.toList, slowAdd) )
我使用
newCachedThreadPool()
在我的笔记本电脑上得到了以下时间:
splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s
我注意到“拆分期货”的时间可能无效,因为期货是在
timeFutures
块外启动的。但是,
splitList
函数应该在
timeFutures
函数内正确调用。对我来说,一个重要的收获是选择最适合硬件的ExecutionContext的重要性。
select
或chooseAny
函数,就很容易从列表中取出两个完成的项目,将它们的总和的未来放回到列表中,并递归(假设您的操作既是可结合的又是可交换的)。这里是一个快速示例,它使用我以前为Scalaz的未来编写的collapse
方法。 - Travis Brown