Scala等待Future序列

38

我希望像下面这样的代码能够等待两个未来事件,但实际上它并不能。

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

我曾认为seq.onComplete会等待所有操作完成后再自行结束,但实际上它的结果是:

true
false
.sequence 在 scala.concurrent.Future 的源代码中有些难以理解,我想知道如何实现一个等待动态大小的序列中所有原始future完成的并行操作,或者这里可能存在什么问题。编辑:一个相关的问题:https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)
6个回答

53

等待所有结果(无论成功或失败)的常见方法是将失败转化为新的表示形式,使得所有future都完成并返回某个结果(尽管可能是代表失败的结果)。将其提升到 Try 是一种自然的方式。

Twitter 的 future 实现 提供了一个 liftToTry 方法,使这个过程变得简单。但你也可以使用标准库的实现来做类似的事情:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

现在,Future.sequence(lifted) 将会在所有 future 完成后完成,并使用 Try 来表示成功和失败。
因此,等待一系列 futures 的通用解决方案可能如下所示,假设执行上下文当然是隐式可用的。
  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }

非常感谢,我可能会使用并调整这个函数!您是否认为使用Try本质上是将执行结果的输出值或流程控制异常映射到成功或失败的返回值? - matanster
顺便说一下,我同样可以使用 Promise 来实现相同的转换,而不是 Try。在需要扩展 Future 特质(使用隐式类进行 pimping)的地方,我已经开始使用 Promise 了。 - matanster
2
你几乎肯定不想在这里使用 promises。Try 本质上是一个同步的 Future,其中你只有两种状态(成功和失败),而不是三种状态(未满足、成功、失败),这正好适用于这个问题。 - Travis Brown

27

Future.sequence生成的Future在以下情况下完成:

  • 所有Future成功完成,或者
  • 其中一个Future失败了

你遇到的是第二种情况,在这种情况下,让包装的Future在其中一个嵌套的Future失败时就完成是有意义的。因为在失败情况下,封装的Future只能容纳单个Throwable。等待其他未完成的Future没有意义,因为结果将会是相同的失败。


1
谢谢,对的,我应该从 .sequence 类型的一元特性中收集到那种行为。但是重点是,我想等待它们全部完成;使用 .sequence 只是一种(错误的)方式。最直接的方法是怎样实现呢? - matanster
2
这并没有真正回答问题的第二部分。想要收集所有结果,无论成功与否,是完全合理的,遗憾的是标准库的 futures 实现没有像 Twitter 的 liftToTry 这样的东西来方便实现。 - Travis Brown
2
@TravisBrown 不需要在stdlib Futures中特别处理该方法:f.transform(Success(_))。 - Viktor Klang
@ViktorKlang…不幸的是,它使用了一个2.12之前不存在的“transform”。 :) - Travis Brown
2
@TravisBrown 是的。但现在它已经存在了,所以使用它吧。 - Viktor Klang
6
除非你是成千上万将来数年内将被困在使用或至少支持2.11的Scala开发人员之一。 - Travis Brown

3

这是支持前面答案的一个例子。只使用标准的Scala API,就有一种简单的方法可以实现这一点。

在这个例子中,我创建了3个future。它们分别在5、7和9秒完成。调用 Await.result 将会阻塞,直到所有future都已经解决为止。一旦所有3个future都完成了,a 就会被设置为 List(5,7,9) 并继续执行。

此外,如果任何一个future中抛出异常,Await.result将立即取消阻塞并抛出异常。取消注释Exception(...)行以查看其工作原理。

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ⇒
      e.printStackTrace()
  }

2

虽然这是一个相当老的问题,但这是我最近如何使其运行的方法。

    object Fiddle {
      val f1 = Future {
        throw new Throwable("baaa") // emulating a future that bumped into an exception
      }
    
      val f2 = Future {
        Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
        2
      }
    
      val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
    
      val seq = Future.sequence(lf) 
      import scala.concurrent.duration._
      Await.result(seq, Duration.Inf) 
    }

这个操作不会完成,它会一直等待直到所有的未来任务都完成。您可以根据自己的使用情况更改等待时间。我将其设置为无限,并且在我的情况下需要。


1
我们可以通过隐式类为Seq [Future [T]]增加自己的onComplete方法:
  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

然后,在您的特定MWE中,您可以这样做:
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

这种解决方案的优点在于,它允许您像在单个future上一样调用一个序列future上的onComplete


希望我们在新的集合库中不需要这种诡计。 - matanster
@matanster,新集合库的开发中是否有什么特别之处,让你希望新集合库可以通过更少的技巧实现相同的行为? - Bruno

1

试着避免额外的麻烦,创造未来。

implicit val ec = ExecutionContext.global

val f1 = Future {
  Try {
    throw new Throwable("kaboom")
  }
}

val f2 = Future {
  Try {
    Thread.sleep(1000L)
    2
  }
}

Await.result(
  Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
  case Success(res) => println(s"Success. $res")
  case Failure(e)   => println(s"Failure. ${e.getMessage}")
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接