如何每5分钟执行一次Akka actor?

45

我想知道 Akka 中是否有任何机制可以定期执行 actor?


2
这里我刚找到一个完美的解决方案:http://doc.akka.io/docs/akka/2.0/scala/scheduler.html - Evans Y.
4个回答

67

在 Akka 1.3.1 中,你不需要一个 actor 就能实现这个功能,你可以通过以下方式调度一个函数,使其每隔 5 分钟被调用:

Scheduler.schedule(() => println("Do something"), 0L, 5L, TimeUnit.MINUTES)

然而,如果你想让它成为一个Actor,出于其他原因,你应该像这样调用它

case class Message()

val actor = actorOf(new Actor {
  def receive = {
    case Message() => println("Do something in actor")
  }
}).start()

Scheduler.schedule(actor, Message(), 0L, 5L, TimeUnit.MINUTES)

如果你正在使用Akka 2.0,则应该像这样完成

val system = ActorSystem("MySystem")
system.scheduler.schedule(0 seconds, 5 minutes)(println("do something"))

或者像这样每5分钟向一个Actor发送一条消息:

case class Message()
class MyActor extends Actor {
  def receive = { case Message() => println("Do something in actor") }
}

val system = ActorSystem("MySystem")
val actor = system.actorOf(Props(new MyActor), name = "actor")
system.scheduler.schedule(0 seconds, 5 minutes, actor, Message())

3
Scheduler.schedule似乎不再存在。 - Phil
2
请记住,在新的调用在5分钟后触发之前,您可能希望知道上一个调用是否已经完成了其过程,至少在某些情况下是这样。为此,您可以使用scheduleOnce和一些额外的逻辑。 - matanster
如果你想从Actor内部执行此操作,可以调用context.system.scheduler.schedule(<params>)来进行调度工作。 - NateH06

22

使用定时器方式是一个不错的方法,但如果在定时器触发后需要执行的工作量太大,会导致消息队列积压。如果您希望定时器在一次迭代结束和下一次迭代开始之间发生,请使用以下模式的scheduleOnce

import akka.actor.Actor
import scala.concurrent.duration._

class SchedulingActor extends Actor {

  override def preStart(): Unit = {
    self ! "Do Some Work"
  }

  def receive = {
    case "Do Some Work" => 
      doWork
      context.system.scheduler.scheduleOnce(5 minutes, self, "Do Some Work")
  }

  def doWork = ???
}

2
这是截至2016年的相关答案。Scheduler.schedule不再是有效的方法。 - Zee
你能不能直接使用 context.system.scheduler.schedule(...) 每5分钟向 self 发送消息呢?这样看起来更简洁,也消除了覆盖 preStart 的需要。 - Branislav Lazic
@BranislavLazic 不,不设置定期计划的整个目的是避免在处理逻辑需要超出每个预定消息之间的时间间隔时向演员发送另一条消息。使用scheduleOnce(),演员可以花费尽可能长的时间来完成所需的任何工作,然后在安全的情况下设置另一个计划以向自身发送未来的消息。对于某些情况,这可以避免死锁和其他并发问题。 - speby

3

更完整的Java示例:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;

public class AnActor extends AbstractActor {
    private final FiniteDuration SCHEDULED_WORK_DELAY = new FiniteDuration(5, TimeUnit.MINUTES);

    @Override
    public void preStart() {
        getSelf().tell("Do Scheduled Work", ActorRef.noSender());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
        .matchEquals("Do Scheduled Work", work -> {
            doScheduledWork();
         context().system().scheduler().scheduleOnce(SCHEDULED_WORK_DELAY, getSelf(),
                "Do Scheduled Work", context().dispatcher(), ActorRef.noSender());
        })
        .build();
    }

    private void doScheduledWork() { ... }
}

2
如果有人需要Java代码,他们可以按照以下方式进行操作:
    Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), Duration.create(5, TimeUnit.MINUTES), cronActor, "tick", system.dispatcher(), null);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接