如何在Scala中使用Future进行轮询?

9
我希望能够轮询API端点直到达到某个条件。我预计它会在几秒钟到一分钟内达到这个条件。我有一个调用端点并返回Future的方法。有没有办法将Future链接在一起,以便每隔n毫秒轮询此端点,并在t次尝试后放弃?
假设我有一个具有以下签名的函数:
def isComplete(): Future[Boolean] = ???

在我看来,最简单的方法是将所有操作都变成阻塞式的:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}

但这种方法可能会占用所有线程,而且它不是异步的。我还考虑了递归的实现:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}

然而,我担心调用栈会达到最大值,因为这不是尾递归。

有没有更好的方法来解决这个问题?

编辑:我正在使用akka。


1
根据您的需求,您可能想考虑使用Akka的scheduler - Leo C
3个回答

8
你可以使用Akka Streams。例如,每500毫秒调用isComplete直到Future的结果为true,最多执行五次:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}

这是一个很好的答案。实际上,我需要一些来自 isComplete 的数据,这些数据在我的简化实现中没有被表示出来,我使用了 inclusive = truetakeWhileSink.lastOption 上获取了这些数据。留下这个答案,让其他人也可以这样做。 - user1943992
这并不意味着如果我们在进行完所有5次尝试之前获得了成功的结果,我们就不会继续尝试,对吗? - gravetii

4

实际上它根本不是递归,因此堆栈会很好。

您的方法可以改进的一点是使用某种调度程序来代替Thread.sleep,以便您不会阻塞线程。

此示例使用标准Java的TimerTask,但如果您使用某种框架(如Akka、Play或其他),它可能有自己的调度程序,那将是更好的选择。

object Scheduler {
   val timer = new Timer(true)
   def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
     val promise = Promise[T]()
     timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
     promise.future
   }
}


def untilComplete(attempts: Int = 10) = isComplete().flatMap { 
   case true => Future.successful(())
   case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
   case _ => throw new Exception("Attempts exhausted.") 
}

感谢您的回答。我正在使用akka。在这个例子中,使用 akka.pattern 中的 after 代替您的 Scheduler.after 是否可行? - user1943992
当然,那很完美。 - Dima
还有这个不是递归吗?untilComplete调用了自身。 - user1943992
1
“untilComplete” 要求 “Scheduler” 在不同的线程中调用自身,使用不同的调用堆栈。 - Steve Waldman
1
@user1943992,就像Steve所说的那样 :) 它并没有真正地调用自己,而是创建了一个匿名函数来调用自己,并将其作为参数传递给Scheduler,在此调用完成后,Scheduler会在某个时间点再次调用它。但更重要的是,所有这些都发生在传递给.flatMap的匿名函数内部,在第一次调用完成后,该函数本身将被调用。这就是你原始代码片段不是递归的原因,即使它没有使用Scheduler - Dima

4
我为自己创建了一个库来实现这个功能。我有...
trait Poller extends AutoCloseable {
  def addTask[T]( task : Poller.Task[T] ) : Future[T]
  def close() : Unit
}

Poller.Task长成这个样子

class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )
Poller轮询每个period,直到pollFor方法成功(返回Some[T])或超时timeout

方便起见,当我开始轮询时,我将其包装在Poller.Task.withDeadline中:

final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
  def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}

该功能将任务的(不可变,可重用) timeout 时间转换为每次轮询尝试的截止时间以进行超时处理。

为了高效地进行轮询,我使用Java的 ScheduledExecutorService

def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
  val promise = Promise[T]()
  scheduleTask( Poller.Task.withDeadline( task ), promise )
  promise.future
}

private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
  if ( isClosed ) { 
    promise.failure( new Poller.ClosedException( this ) )
  } else {
    val task     = twd.task
    val deadline = twd.deadline

    val runnable = new Runnable {

      def run() : Unit = {
        try {
          if ( ! twd.timedOut ) {
            task.pollFor() match {
              case Some( value ) => promise.success( value )
              case None          => Abstract.this.scheduleTask( twd, promise )
            }
          } else {
            promise.failure( new Poller.TimeoutException( task.label, deadline ) )
          }
        }
        catch {
          case NonFatal( unexpected ) => promise.failure( unexpected )
        }
      }
    }

    val millis = task.period.toMillis
    ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
  }
}

这段代码似乎很好地工作,而且不需要阻塞单个线程或使其进入睡眠状态。

(看一下库,有很多可以做的事情,以使其更清晰、易读,并通过将Poller.Task.withDeadline的角色澄清为该类的原始构造函数private来澄清截止日期应始终从任务timeout计算,不应是任意自由变量。)

此代码来自此处(框架和特性)此处(实现)。(如果您想直接使用它,maven坐标在此处。)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接