Scala:将List [Future]转换为Future [List],忽略失败的futures

126
我正在寻找一种将任意长度的Future列表转换为List的Future的方法。我正在使用Play框架,所以最终我真正想要的是Future [Result],但为了简单起见,让我们只说Future [List[Int]]。通常的做法是使用Future.sequence(...) ,但有一个问题... 我得到的列表通常包含约10-20个Futures,并且其中一个Future失败并不罕见(它们在进行外部web服务请求)。

如果其中一个失败,我不想重新尝试所有Futures,而是只想获取已成功的结果并返回。

例如,下面的操作行不通:

```scala val futures = List(Future.successful(1), Future.failed(new RuntimeException("failed")), Future.successful(2), Future.failed(new RuntimeException("failed"))) val all = Future.sequence(futures) all // this will throw the exception "failed" ```
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

我希望能够获得1和3,而不仅仅是异常。我尝试使用Future.fold,但显然它只是在后台调用Future.sequence


如果您正在使用Twitter的Future,它有一个名为collectToTry的方法,您只需要过滤成功的即可。https://twitter.github.io/util/docs/com/twitter/util/Future$.html#collectToTry[A](fs:scala.collection.Seq[com.twitter.util.Future[A]]):com.twitter.util.Future[Seq[com.twitter.util.Try[A]]] - rvazquezglez
7个回答

159
诀窍是先确保没有任何一个Future失败。在这里,.recover方法是你的好朋友,你可以将它与map方法结合使用,将所有Future[T]结果转换为Future[Try[T]]实例,所有这些实例都肯定是成功的Future。
注意:你也可以在这里使用Option或Either,但如果你想特别捕获异常,Try是最干净的方式。
def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
  f.map(Success(_)).recover { case x => Failure(x)}

val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

然后像以前一样使用Future.sequence,这将为您提供一个Future[List[Try[T]]]
val futureListOfTrys = Future.sequence(listOfFutureTrys)

然后进行筛选:
val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

你甚至可以提取特定的故障,如果需要的话:

您甚至可以提取特定的故障,如果需要:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))

谢谢!.recover 确实是我所缺失的部分。 - Joe
22
你可以使用_.collect{ case Success(x) => x}代替_.filter(_.isSuccess),以消除futureListOfSuccesses中的Try类型。 - senia
43
在Scala 2010中,.recover(x => Failure(x))是无效的语法,应使用.recover({case e => Failure(e)}) - FGRibreau
我认为您缺少了 future wrapper: def futureToFutureOfTry[A](f: Future[A]): Future[Try[A]] = { val p = PromiseTry[A] f.map{ a=> p.success(scala.util.Success(a)) }.recover{ case x: Throwable => p.success(Failure(x)) } p.future } - Dario
请注意,如果您正在使用Finagle futures(com.twitter.util.Future),那么它是Future.collect(listOfFutures),它返回一个序列的future。 - nickf
显示剩余5条评论

18

Scala 2.12 在 Future.transform 上进行了改进,使得使用更少的代码便可实现相同的功能。

val futures = Seq(Future{1},Future{throw new Exception})

// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_)))) 

val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))

val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))

11

我尝试了Kevin的答案,但在我的Scala版本(2.11.5)上遇到了一个小问题......我进行了修正,并编写了一些额外的测试,如果有兴趣的话......这是我的版本>

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {

    /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
      * The returned future is completed only once all of the futures in `fs` have been completed.
      */
    def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
      Future.sequence(listOfFutureTrys)
    }

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
      f.map(Success(_)) .recover({case x => Failure(x)})
    }

    def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isFailure))
    }

    def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isSuccess))
    }
}


// Tests... 



  // allAsTrys tests
  //
  test("futureToFutureTry returns Success if no exception") {
    val future =  Future.futureToFutureTry(Future{"mouse"})
    Thread.sleep(0, 100)
    val futureValue = future.value
    assert(futureValue == Some(Success(Success("mouse"))))
  }
  test("futureToFutureTry returns Failure if exception thrown") {
    val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
    Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
    val futureValue = future.value

    assertResult(true) {
      futureValue match {
        case Some(Success(Failure(error: IllegalStateException)))  => true
      }
    }
  }
  test("Future.allAsTrys returns Nil given Nil list as input") {
    val future =  Future.allAsTrys(Nil)
    assert ( Await.result(future, 100 nanosecond).isEmpty )
  }
  test("Future.allAsTrys returns successful item even if preceded by failing item") {
    val future1 =  Future{throw new IllegalStateException("bad news")}
    var future2 = Future{"dog"}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(1) == Success("dog"))
  }
  test("Future.allAsTrys returns successful item even if followed by failing item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(0) == Success("dog"))
  }
  test("Future.allFailedAsTrys returns the failed item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys.size == 1)
  }
  test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0) == Success("dog"))
    assert(listOfTrys.size == 1)
  }

7

我刚刚看到了这个问题,可以提供另一个解决方案:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                 executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
      (fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr
    } map (_.result())
}

这里的想法是在fold操作中,使用for-comprehension语法等待列表中的下一个元素完成,如果下一个元素失败,则回退到已有的内容。

我不喜欢这个名字,但我喜欢它的实现方式,直接从序列实现。 - crak

1
你可以轻松地使用选项包装未来的结果,然后展开列表:
def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
    f.map(Some(_)).recover {
      case e => None
    }
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))

val futureListOfOptions = Future.sequence(listOfFutureOptions)

val futureListOfSuccesses = futureListOfOptions.flatten

以防其他人在第一个函数中遇到Some错误,可以像这样重写第一个函数以防止编译器错误: def futureToFutureOption[T](f: Future[T]): Future[Option[T]] = f.map(Option(_)).recover { case e => None } - Zee

0
如果您需要出于某些原因保留失败的 futures,例如记录日志或条件处理,这在 Scala 2.12+ 中是可行的。您可以在此处找到可用的代码。
val f1 = Future(1)
val f2 = Future(2)
val ff = Future.failed(new Exception())

val futures: Seq[Future[Either[Throwable, Int]]] =
  Seq(f1, f2, ff).map(_.transform(f => Success(f.toEither)))

val sum = Future
  .sequence(futures)
  .map { eithers =>
    val (failures, successes) = eithers.partitionMap(identity)

    val fsum = failures.map(_ => 100).sum
    val ssum = successes.sum

    fsum + ssum
  }

assert(Await.result(sum, 1.second) == 103)

0

您还可以将成功和失败的结果分别收集到不同的列表中:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
  futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
    flist.flatMap { case (elist, alist) =>
      future
        .map { success => (elist, alist :+ success) }
        .recover { case error: Throwable => (elist :+ error, alist) }
    }
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接