在一个actor中使用Akka调度器是否可行?

28

我希望有一个让演员暂时休眠的可能性。演员应该自己决定要睡多久。由于Thread.sleep()不是推荐的方法,所以我考虑使用akka中的调度程序。因此,我定义了一个演员,另一个演员可以注册以被唤醒。

class Scheduler extends Actor {

  def receive = {
    case Sleep(duration) => context.system.scheduler.scheduleOnce(duration) {
      sender ! Ring
    }
  }
}

但是发送消息的Actor从未收到Ring消息。

  • 在Actor内部使用调度器是否推荐?
  • 为什么发送消息的Actor从未接收到Ring消息?
  • 如果这不可行,解决该问题的推荐方法是什么?
3个回答

45

首先回答标题问题:是的,在一个actor内部使用调度器是可能的。

case Sleep(duration) =>
  context.system.scheduler.scheduleOnce(duration, self, Ring)

问题背后的问题

您没有说明实际想要达到的目标,因此我在这里做出一个有根据的猜测:您希望执行通常称为“X”的操作的演员暂时执行称为“Y”的操作,暂停“X”活动。完整解决方案如下:

class Sleepy extends Actor {
  def receive = {

    ... // cases doing “X”

    case Sleep(duration) =>
      case object WakeUp
      context.system.scheduler.scheduleOnce(duration, self, WakeUp)
      context.become({
        case WakeUp => context.unbecome()
        // drop the rest
      }, discardOld = false)
  }
}

使用FSM特性可以实现同样的功能,在正常状态和休眠状态之间进行切换。当然,在睡眠时你也可以做任何你想做的事情,例如在Akka 2.1中混入Stash特性,并在睡眠时对所有(或某些)消息调用stash(),在收到WakeUp消息时调用unstashAll();或者你也可以完全做其他的事情。角色非常灵活。

演员不会做什么

演员永远不会真正地睡觉,它们总是处理传入的消息。如上所示,您可以定义其含义,但基本原则是您无法挂起演员以使其不处理其邮箱中的消息。


是的,我想正是这样做。使用become()unbecome()似乎是一个不错的解决方案。谢谢。 - Felix Reckers

16

在您传递给调度程序的闭包中,您正在封闭“sender”。这意味着Ring消息很可能被发送到错误的actor。您应该执行以下操作:

case Sleep(duration) => 
  val s = sender
  context.system.scheduler.scheduleOnce(duration) {
    s ! Ring
  }
}

0
Roland Kuhn的回答涵盖了这个主题,我想补充一下还有另一个常见的使用情况:当向不同的actor发送消息并等待该actor响应时,限制等待时间是非常常见的。在这种情况下,使用调度程序是很有用的。
otherActor ! Request(...)
context.system.scheduler.scheduleOnce(duration, self, WakeUp)
...
case Response(...) => ...
case WakeUp => context stop self

1
如果您只需要在接收消息时设置超时,请使用 receiveTimeout。 - Viktor Klang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接