Akka监管策略 - 正确使用案例

4
我一直在使用Akka Supervisor策略来处理业务逻辑异常。阅读最著名的Scala博客系列Neophyte之一,我发现他给出了一个不同的目的,这与我一直在做的相比有所不同。
例如:假设我有一个HttpActor,应该联系外部资源,如果它关闭,我会抛出一个异常,现在是一个ResourceUnavailableException
如果我的Supervisor捕捉到了这个异常,我将调用我的HttpActor上的Restart,在我的HttpActor preRestart方法中,我将调用schedulerOnce来重试。
演员:
class HttpActor extends Actor with ActorLogging {

  implicit val system = context.system

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting Actor due: ${reason.getCause}")
    message foreach { msg =>
      context.system.scheduler.scheduleOnce(10.seconds, self, msg)
    }
  }

  def receive = LoggingReceive {

    case g: GetRequest =>
      doRequest(http.doGet(g), g.httpManager.url, sender())
  }

一个主管:
class HttpSupervisor extends Actor with ActorLogging with RouterHelper {

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5) {
      case _: ResourceUnavailableException   => Restart
      case _: Exception                      => Escalate
    }

  var router = makeRouter[HttpActor](5)

  def receive = LoggingReceive {
    case g: GetRequest =>
      router.route(g, sender())

    case Terminated(a) =>
      router = router.removeRoutee(a)
      val r = context.actorOf(Props[HttpActor])
      context watch r
      router = router.addRoutee(r)
  }
}

这里的重点是什么?

如果我的doRequest方法抛出了ResourceUnavailableException,监督者会接收到并重新启动演员,强制它在一定时间后重新发送消息,根据调度程序的要求。我看到的优势是我可以免费获得重试次数和处理异常本身的好方法。

现在看看博客,他展示了另一种方法,如果需要重试,只需像这样发送消息:

def receive = {
  case EspressoRequest =>
    val receipt = register ? Transaction(Espresso)
    receipt.map((EspressoCup(Filled), _)).recover {
      case _: AskTimeoutException => ComebackLater
    } pipeTo(sender)

  case ClosingTime => context.system.shutdown()
}

FutureAskTimeoutException 情况下,他将结果作为一个 ComebackLater 对象传输,他将通过以下方式处理它:
case ComebackLater =>
      log.info("grumble, grumble")
      context.system.scheduler.scheduleOnce(300.millis) {
        coffeeSource ! EspressoRequest
      }

对我来说,这基本上就是您可以使用策略监督员的全部内容,但是需要手动完成,没有内置的重试逻辑次数。

那么这里最好的方法是什么,为什么?我的使用akka监督策略的概念完全错误吗?
1个回答

4
您可以使用BackoffSupervisor

akka.pattern.BackoffSupervisor作为内置模式提供,实现所谓的指数回退监督策略,当子actor失败时再次启动,每次重启之间的时间延迟逐渐增长。

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = "myEcho",
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly 
  ).withAutoReset(10.seconds) // the child must send BackoffSupervisor.Reset to its parent 
  .withSupervisorStrategy(
    OneForOneStrategy() {
      case _: MyException => SupervisorStrategy.Restart
      case _ => SupervisorStrategy.Escalate
    }))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接