使用可交换和结合的运算符对Future列表进行折叠/缩减

14

考虑以下内容:

import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global

def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))

def timeFuture(fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  Await.result(fn, Inf)
  println((System.currentTimeMillis - t0) / 1000.0 + "s")
}

下面两个代码块都需要大约2.5秒的时间来执行:

// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }

// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }

据我所了解,这是因为Future.reduce/traverse是通用的,因此不会因为使用结合运算符而运行得更快,但是否有一种简单的方法来定义一个计算,使得折叠/缩减在至少2个值可用时(在fold的情况下为1),从而在列表中仍然生成某些项目的同时,已经生成的项目已经被计算?


好的,明白了。不过我会等待更多的想法。 - Erik Kaplun
看起来你应该可以使用RxScala Observable来完成这个任务,但我不确定如何将它们链接在一起。 - DaoWen
1
一旦你有了selectchooseAny函数,就很容易从列表中取出两个完成的项目,将它们的总和的未来放回到列表中,并递归(假设您的操作既是可结合的又是可交换的)。这里是一个快速示例,它使用我以前为Scalaz的未来编写的collapse方法。 - Travis Brown
@TravisBrown:感谢您指出需要满足交换律;我还将研究Scalaz的“Nondeterminism”。 - Erik Kaplun
@TravisBrown:如果您将您的评论发布为答案,我会接受它——这似乎是唯一最快的解决方案。 - Erik Kaplun
显示剩余3条评论
3个回答

3

Scalaz有一个实现了未来的库,其中包含一个chooseAny组合器,它接受一个未来的集合,并返回一个元组的未来,该元组由第一个完成的元素和其余未来组成:

def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]

Twitter的实现将futures称为select。标准库中不包括它(但请参见Som Snytt上面指出的Viktor Klang的实现)。我将在这里使用Scalaz的版本,但翻译应该很简单。
一个让操作按您所需运行的方法是从列表中取出两个已完成的项目,将其总和的future推回列表,并递归(有关完整的工作示例,请参见this gist)。
def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
  Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
    _.flatMap {
      case (hv, tf) =>
        Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
          _.flatMap {
            case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
          }
        )
    }
  )

在您的情况下,您需要调用类似这样的内容:
timeFuture(
  collapse(futures)(
    Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
  )
)

在我的双核笔记本电脑上只需要略微超过1.6秒就可以运行,所以它正在按预期工作(即使slowInt的执行时间发生变化,它也会继续执行您想要的操作)。


1
为了获得与您相似的计时,我需要使用本地ExecutionContext,像这样(从这里):
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

在此之后,我通过将列表拆分并将它们分配给vals来获得更好的性能(基于记住for循环中的futures是按顺序处理的,除非它们在for循环之前被分配给vals)。由于列表的关联性质,我可以使用同一函数再次调用它们。我修改了timeFuture函数以接受描述并打印添加结果:
def timeFuture(desc: String, fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  val res = Await.result(fn, Inf)
  println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}

我刚接触Scala,还在研究如何在最后一步重用同一个函数(我认为这应该是可能的),所以我作弊了,创建了一个辅助函数:

def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))

然后我可以做以下事情:
timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )

val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)

带有 这里 的最后一个压缩文件。

我认为这是并行化工作并重新组合结果。当我运行它们时,我得到:

reduce = 210 in 2.111s
split futures = 210 in 1.201s

我使用了硬编码的一对takes,但我的想法是整个列表分割可以放入一个函数中,并实际重用分配给右侧和左侧分支的关联函数(由于余数允许稍微不平衡的树),最终达到目的。
当我对 slowInt()slowAdd() 函数进行随机化时:
def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }

我仍然认为“分裂未来”比“减少未来”更快完成。启动时似乎存在一些开销,这会影响第一个timeFuture调用。以下是在启动惩罚超过“分裂未来”的情况下运行它们的几个示例:
split futures = 210 in 2.299s
reduce = 210 in 4.7s

split futures = 210 in 2.594s
reduce = 210 in 3.5s

split futures = 210 in 2.399s
reduce = 210 in 4.401s

在比我的笔记本电脑更快的计算机上,并且使用与问题中相同的ExecutionContext,我没有看到如此大的差异(在slow*函数中没有随机化):
split futures = 210 in 2.196s
reduce = 210 in 2.5s

在这里,看起来“分裂未来”只是领先一点。


最后一次尝试。这是一个函数(也称为丑陋的东西),它扩展了我上面提到的想法:

def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
    def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
      (x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
    }
    def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
      (right, left) match {
        case(r::Nil, Nil) => r
        case(Nil, l::Nil) => l
        case(r::Nil, l::Nil) => applyAssocFn( r, l )
        case(r::Nil, l::ls) => {
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
          applyAssocFn( r, lret )
        }
        case(r::rs, l::Nil) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
          applyAssocFn( rret, l )
        }
        case (r::rs, l::ls) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
          val heads = applyAssocFn(r, l)
          applyAssocFn( heads, tails )
        }
      }
    }
    val( right, left ) = f.splitAt(f.size/2)
    divideAndConquer( right, left )
  }

将列表进行非尾递归拆分并尽快将 futures 分配给值(以便启动它们)会使 Scala 失去所有美感。 当我进行以下测试时:
timeFuture( "splitList", splitList( futures.toList, slowAdd) )

我使用newCachedThreadPool()在我的笔记本电脑上得到了以下时间:
splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s

我注意到“拆分期货”的时间可能无效,因为期货是在timeFutures块外启动的。但是,splitList函数应该在timeFutures函数内正确调用。对我来说,一个重要的收获是选择最适合硬件的ExecutionContext的重要性。

我认为这个问题遇到了我在Dave Swartz的答案中指出的同样限制。 - Erik Kaplun
在我的测试中,“split futures”总是比“reduce”更快完成。即使我随机化slowInt()slowAdd()函数也是如此。Future.sequence具有for-comprehension,我试图在其中启动futures。在这种情况下,我比预期提前了一半的启动时间。 - n0741337
我并不是说这种方法不更快;你可以尝试使用 futures = [400ms,200ms,400ms,200ms],你应该会发现它花费了 700ms 的时间,但也可能只需 600ms - 我错了吗? - Erik Kaplun
无论如何,看起来我们不会得到更多的解决方案,所以至少+1 :) 感谢您的尝试。 - Erik Kaplun
1
顺便说一下,splitList 函数能够在大约 600 毫秒内处理 futures = [400毫秒, 200毫秒, 400毫秒, 200毫秒] 这种情况。而使用任何一个 ExecutionContext 启动时都会有一定的启动惩罚,所以对于第二个案例,“reduce” 方式需要耗费0.701秒,而 splitList 方式只需耗费0.642秒。无论哪种方式,启动惩罚约为40毫秒。 - n0741337
显示剩余2条评论

1
以下答案在20核机器上运行时间为700毫秒,考虑到需要按顺序运行的内容,这是任何机器和实现方式所能做到的最好结果(20个并行200毫秒的slowInt调用,接着是5个嵌套的100毫秒slowAdd调用)。在我的4核机器上运行时间为1600毫秒,这也是该机器所能达到的最佳结果。
slowAdd被展开时,其中f代表slowAdd:
f(f(f(f(f(x1, x2), f(x3, x4)), f(f(x5, x6), f(x7, x8))), f(f(f(x9, x10), f(x11, x12)), f(f(x13, x14), f(x15, x16)))), f(f(x17, x18), f(x19, x20)))

您提供的例子使用了Future.sequence,在一个20核心机器上运行需要2100毫秒(20个并行的200毫秒slowInt调用,然后是19个嵌套的100毫秒slowAdd调用)。在我的4核心机器上运行需要2900毫秒。
slowAdd调用被扩展时,f代表slowAdd:
f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(x1, x2), x3), x4), x5), x6), x7), x8), x9), x10), x11), x12), x13), x14), x15) x16) x17) x18) x19) x20)
Future.reduce 方法调用 Future.sequence(futures).map(_ reduceLeft op),因此您提供的两个示例是等效的。
我的答案使用一个名为 combine 的函数,该函数接受一组 futures 和一个 op,即将两个 futures 组合成一个的函数作为参数。该函数返回应用于所有 futures、成对 futures、成对成对 futures 等等的 op,直到只剩下一个 future,然后返回该 future:
def combine[T](list: List[Future[T]], op: (Future[T], Future[T]) => Future[T]): Future[T] =
  if (list.size == 1) list.head
  else if(list.size == 2) list.reduce(op)
  else list.grouped(2).map(combine(_, op)).reduce(op)

注意:我稍微修改了你的代码以符合我的风格偏好。
def slowInt(i: Int): Future[Int] = Future { Thread.sleep(200); i }
def slowAdd(fx: Future[Int], fy: Future[Int]): Future[Int] = fx.flatMap(x => fy.map { y => Thread.sleep(100); x + y })
var futures: List[Future[Int]] = List.range(1, 21).map(slowInt)

以下代码使用combine函数来解决您的问题:
timeFuture(combine(futures, slowAdd))

以下代码更新了您的Future.sequence示例,以适应我的修改:
timeFuture(Future.sequence(futures).map(_.reduce{(x, y) => Thread.sleep(100); x + y }))

你假设所有的 slowInt 调用完成所需时间相同...那如果最初的 slowInt 列表需要[200毫秒、400毫秒、200毫秒、400毫秒]才能完成呢?使用当前的算法,你将有两组[200毫秒、400毫秒],因此生成列表仍需要400毫秒+连续跟随3个 slowAdd调用 => (400+300)毫秒=700毫秒,即使在这种情况下,先将两个200毫秒的 slowInt 相加,这样当两个400毫秒的操作完成时,已经完成了100毫秒的工作,然后再进行2次 slowAdd,总计(400+100+100)毫秒=600毫秒——我有遗漏什么吗? - Erik Kaplun
谢谢指出Future.reduce在内部使用Future.sequence,顺便问一下,我在提问之前是否应该自己查看源代码。 - Erik Kaplun
你是正确的。如果 slowInt 调用不恰好需要 200 毫秒完成,那么算法可以得到改进。 - Dave Swartz
好的,看起来我们没有得到更多的解决方案,所以至少+1 :) 感谢你的尝试。 - Erik Kaplun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接