未来的递归模式/任意长度的未来链接

4

我想知道如何递归地构建一条Akka future链,使其按顺序运行,如果future中的doWork调用失败,则应重试该future最多3次,如果重试次数耗尽,则链应该失败。假设所有doWork调用都成功,返回的future futChain 应该只完成一次。

object Main extends App {
  val futChain = recurse(2)

  def recurse(param: Int, retries: Int = 3): Future[String] {
    Future {
      doWorkThatMayFailReturningString(param...)
    } recoverWith {
      case e => 
        if (retries > 0) recurse(param, retries -1)
        else  Future.failed(e)
    } flatMap {
      strRes => recurse(nextParam) //how should the res from the previous fut be passed?
    }
  }

  futChain onComplete {
    case res => println(res) //should print all the strings
  }
}
  1. 我该如何将结果作为集合返回?即在这个例子中,每个doWork函数返回的String(我需要修改recurse函数以返回Future[List[String]])。
  2. 我应该使用recover还是recoverWith
  3. 调用flatMap来链接这些调用是否可以?
  4. 我应该考虑尾递归和栈溢出问题吗?
  5. 我最好递归地构建一个future列表并缩减它们吗?

这个问题归结为如何使用递归创建任意长的未来链。不只是在 for 推导中放置一堆预定义的未来。 - NightWolf
1
foldLeft 可以用于创建动态大小的 Future 链。 - Randall Schulz
@RandallSchulz,您能否提供一个例子? - NightWolf
是的,但要等到我周一回到办公室才行。 - Randall Schulz
1个回答

12

您可以像这样实现可重试的Future

def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = {
    n match {
        case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)}
        case _ => f
    }       
}

这段代码没有针对尾递归进行优化,但如果您只想重试几次,就不会出现堆栈溢出的情况(而且我想如果前几次失败了,它总会继续失败)。

然后,我会单独处理链接。如果您有一定数量的函数需要链接在一起,每个函数都依赖于前一个函数的结果(并且由于某种原因您希望聚合结果),则可以使用for推导式(是flatMap的语法糖):

for {
    firstResult <- retry(Future(doWork(param)))(3)
    secondResult <- retry(Future(doWork(firstResult)))(3)
    thirdResult <- retry(Future(doWork(secondResult)))(3)
} yield List(firstResult, secondResult, thirdResult)

对于任意长度的链,您可以使用Future.sequence(在Akka库中的Futures)并行执行它们:

def doWork(param: String): String = ...

val parameters: List[String] = List(...)

val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))

这将把原本的List[Future[String]]解开为Future[List[String]]

以下是一种类似顺序执行的方法:

def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = {
    seq.foldLeft(Future.successful(List[B]())) { case (left, next) =>
        left.flatMap(list => f(next).map(_ :: list))
    }
}

def doWork(param: String): String = ...

val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param))) 

这些功能的实现非常敏感,与您的用例密切相关。如果链中的任何一个future失败,上述两个函数将返回失败的future。有时您需要这样做,有时则不需要。如果您只想收集成功的future,并且舍弃失败的future而不使整个结果失败,则可以添加额外的步骤来恢复失败。
此外,recoverrecoverWith之间的区别在于它接受的PartialFunction类型。recover使用默认值替换失败的future,而recoverWith使用另一个Future替换。在我的retry的情况下,recoverWith更合适,因为我正在尝试使用自身恢复失败的Future

是的,我理解重试语义。问题实际上是关于如何实现任意链的。您能否提供有关如何实现arb chains的示例。使用上面的for comprehension与递归不同。 - NightWolf
@NightWolf,你应该在问题中更清楚地表达。同时最好将重试和未来链接的实现分开。 - Michael Zajac
感谢更新!这是一个好的方法,类似于这个 http://www.michaelpollmeier.com/execute-scala-futures-in-serial-one-after-the-other-non-blocking/ 但问题在于这并不真正具有递归性质,因为我需要在开始时定义参数集。 - NightWolf
为什么需要使用递归?我的最后几个例子使用了预定义的参数集,所以我不太确定你在寻找什么。 - Michael Zajac

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接