Scala Futures - 如何从两个Futures中获取结果或失败信息

3
我正在使用for来并行运行2个Future。我想知道哪个成功了,哪个失败了(所有情况都应该运行到完成,要么是结果,要么是失败状态)。目前我只能检索到组合的成功结果。
我从这里得到了灵感,但它不够完善,因为当一个失败时,我无法获得成功状态,而在两个失败的情况下,我也无法获得两者的失败状态:failure in Scala future's for comprehension
case class TaggedException(context:String, val throwable: Throwable) extends Exception(throwable.getMessage)

val f1 = Future {...}.recoverWith {case e:Throwable => Future.Failed(new TaggedException("first one failed", e))}
val f2 = Future {...}.recoverWith {case e: Throwable => Future.Failed(new TaggedException("second one failed", e))}

val combinedResult = for {
  r1 <- f1
  r2 <- f2
} yield (r1,r2)

combinedResult.onFailure {
case e : TaggedException => ... // if both fail I only get the first line in the for
// in case where single fails I only know fail status without the success of the second
}

我想避免这个混乱:

var countCompleted = 0 ... or some other atomic way to count 
f1 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

f2 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

编辑:另一版本 - 这是一个有效的方法吗?
def toFutureTry[A](future: Future[A]):Future[Try[A]] = future.map(Success(_)).recover {case t: Throwable => Failure(t)}

    val fa: Future[Try[Blah]] = toFutureTry(f1)
    val fb: Future[Try[Foo]] = toFutureTry(f2)

    val combinedRes = for {
      ra <- fa
      rb <- fb
    } yield (ra,rb)

    combinedRes.onComplete {
      case Success(successRes: (Try[Blah], Try[Foo])) => // all of these cases are success or fails
      case Failure(f: Throwable) => // i think it is unused right?
    }
4个回答

2
你可以像这样结合使用 transformzip

val combinedResult: Future[(Try[T], Try[T])] =
  f1.transform(Success(_)).zip(f2.transform(Success(_)))

然后您可以执行:
combinedResult map {
  case (Success(v1), Success(v2)) =>
  case (Success(v1), Failure(f2)) =>
  case (Failure(f1), Success(v2)) =>
  case (Failure(f1), Failure(f2)) =>
}

1

Future[A]上使用flatMap无法帮助您,因为它总是会在其中一个失败的情况下短路,而您真正想要的是累积错误。

使用Future.traverse解决方案可以处理任意数量的Future[A]实例:

val f1 = Future.failed[Int](new Exception("42")).recoverWith {
  case e: Throwable => Future.failed(TaggedException("first one failed", e))
}

val f2 = Future(42).recoverWith {
  case e: Throwable =>
    Future.failed(TaggedException("second one failed", e))
}

val res: Future[List[Either[Throwable, Int]]] = 
  Future
   .traverse(List(f1, f2)) {
      eventualInt => eventualInt
       .map(i => Right(i))
       .recover { case err => Left(err) }
   }

res.onComplete {
  case Failure(exception) =>
    println(exception)
  case Success(value) =>
    value.foreach {
      case Right(int) => println(s"Received num: $int")
      case Left(err) => println(s"Oh no, err: $err")
    }
}

Await.result(res, Duration.Inf)

我们还可以从 Validated 类型中得到一些猫的帮助:

import cats.data.Validated.{Invalid, Valid}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import cats.implicits._
import scala.util.{Failure, Success}

def main(args: Array[String]): Unit = {
  case class TaggedException(context: String, throwable: Throwable)
    extends Exception(throwable.getMessage)

  val f1 = Future.failed[Int](new Exception("42")).recoverWith {
    case e: Throwable => Future.failed(TaggedException("first one failed", e))
  }

  val f2 = Future(42).recoverWith {
    case e: Throwable => Future.failed(TaggedException("second one failed", e))
  }

  val res: Future[List[Validated[Throwable, Int]]] = 
    List(f1, f2)
     .traverse(eventualInt => eventualInt
                       .map(i => Valid(i))
                       .recover { case err => Invalid(err) })

  res.onComplete {
    case Failure(exception) =>
      println(exception)
    case Success(value) =>
      value.foreach {
        case Valid(int) => println(s"Received num: $int")
        case Invalid(err) => println(s"Oh no, err: $err")
      }
  }

  Await.result(res, Duration.Inf)
}

将产生以下结果:
Oh no, err: TaggedException$3: 42
Received num: 42

and without catz :)_ - Avba
@AvnerBarr James的方法可行,因为他用Try[A]包装了内部的thunk,然后在其上使用flatMap不会短路。 - Yuval Itzchakov
但是我得到了一个函数调用的结果,它是一个未来对象 - 我需要将“try”插入其中。 - Avba
如果您想使用的话,我可以提供一个将List [Future [A]]实现为Future [List [A]]的具体实现.. 但这将是非常特定的。 - Yuval Itzchakov
这是一种有效的方法吗?def returnsAFuture() = Future {1/0} val f3 = returnsAFuture() def toFutureTry[T](future: Future[T]) = future.map(Success(_)).recover{case t : Throwable => Failure(t)} val sdf: Future[Try[Int] with Product with Serializable] = toFutureTry(f3) val successFail = f3.map{ Success(_) }.recover { case t: Throwable => Failure(t) } - Avba
显示剩余2条评论

0

使用for-comprehensions时,只要有一行失败,代码就会停在那里并抛出任何异常。如果r1 < - f1引发了Throwable,那么r2 < - f2首先就不会被执行。

个人建议将它们都放入Either[Throwable, whatever]中,而不是将每个.recoverWith放入Future.Failed(...)中。这样,您可以对任何得到的Left值做些事情,并对任何得到的Right值做其他事情。或者您可以使用...具体取决于您想要如何处理错误。

我不知道您具体的用例,但如果您想单独处理每个成功或失败的情况,可以像这样做:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

val f1 = Future(Try(1 / 1))
val f2 = Future(Try(1 / 0))

// for-comprehensions won't fall over when reading a Failure
// as failures don't extend Throwable
val combinedResult = for {
  r1 <- f1 // Success(1)
  r2 <- f2 // Failure(java.lang.ArithmeticException: / by zero)
} yield List(r1,r2)

combinedResult.map { // get inside Future
  f =>
    f.map { // get inside List
      case Success(a) => // do something with success
      case Failure(e: IndexOutOfBoundsException) => // do something with failure
      case Failure(e: ArithmeticException) => // do something with failure
      case Failure(e) => // do something with failure
    }
}

我个人不太喜欢使用onComplete;我更喜欢在Future内部通过映射来保留数据,只要可以的话。不过这只是个人偏好。


你能举个例子详细说明吗? - Avba
我已经更新了答案,并提供了一个通用示例,展示我处理Futures中的异常的方式...如果您没有某种类型类或特征等内容,将其包装在Try中可以真正帮助您,直到稍后再处理异常。 - James Whiteley
不行,这样做不起作用。for-comprehension 的结果是一个失败的 future,所以你没有任何东西可以映射。 - Dima
@Dima 不是这样的,这个for-comprehension的结果是一个Future[List[Try[Int]]] - James Whiteley
哦,我错过了 Try ... 抱歉。 - Dima

0

首先,你并没有并行运行你的 futures(for-comprehension 会按顺序运行它们)。

更新:如评论中所述,上述内容不正确。我错过了 futures 是在 for-comprehension 外创建的这一点。

其次,正如你注意到的那样,如果其中一个 future 失败,另一个 future 就会丢失。

为了解决一个 future 失败导致另一个 future 的结果丢失的问题,你可以将你的 future 值“提升”到 Try 中:

val lifted1: Future[Try[Foo]] = f1.transform(Success(_))
val lifted2: Future[Try[Bar]] = f1.transform(Success(_))

现在,你可以做类似这样的事情:

 (lifted1 zip lifted2).map { 
   case (Success(foo), Success(bar)) => // both succeeded!
   case (Success(foo), Failure(t)) => // foo succeeded, bar failed with t
   case (Failure(t), Success(bar)) => // guess what!
   case (Failure(t1), Failure(t2)) => // oops
 }

如果你经常这样做,你可能会发现使用 lift 操作来“升级”你的 futures 很有用:
 object FutureSyntax { 
   implicit class FutureOps[A](val fu: Future[A]) extends AnyVal {
      def liftToTry: Future[Try[A]] = fu.transform(Success(_))
   }
 }

所以,现在如果你 import FutureSyntax._,上面的代码可以写成:

 (f1.liftToTry zip f2.liftToTry).map { 
    case (Success(foo), Success(bar)) => ... 
    ...
 }

你也可以使用for-comprehension来编写相同的代码,只是会更冗长。在这种情况下,我不会选择for-comprehension:当处理一系列future时,它们很好用,其中后面的future依赖于前面的结果。对于处理独立的futures,zipsequence通常是更好的选择。

你没有并行运行你的 futures(for-comprehension 将按顺序运行它们)。他在使用 for-comprehension 之前急切地创建了 futures。那里没有顺序性。 - Yuval Itzchakov
是的,它们应该在并行运行,因为它们是在循环外创建的。 - Avba
你可以使用 fu.transform(Success(_)) 替代 fu.map(Success.apply).recover { case NonFatal(t) => Failure(t) - Viktor Klang
@Dima 我指的是只有一个参数的方法。 - Viktor Klang
啊,是的...我还在用2.11版本 :/ - Dima
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接