我尝试使用system.shutdown()来等待所有演员完成,但是在此命令之后,演员已经终止并且无法响应检索计算结果所需的任何消息。
我想到的另一件事是向工作人员演员发送像JobCompleted这样的消息,然后可以计算我收到多少这种类型的消息,如果计数等于生成的工作者演员,则知道它们全部完成。但是我不知道演员内部发生故障时会发生什么。此方法还显得太笨重。
可能的重复问题:
最简单的方法:使用ask模式并等待所有返回的futures。可以在for块中或通过Future.sequence实现。(更多信息请参见官方文档)
此外,如果您需要在任务完成后立即终止 - 有许多不同的方法。例如,请参见http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2
我会有一个协调员Actor,它会生成工作者并向每个工作者发送指定的工作消息,构建一个包含每个工作者ActorRefs的集合。当在协调员中接收到JobCompleted(或适当时的JobFailed)消息时,累积结果并从集合中删除该消息的发送者(即Worker的ActorRef)。当集合为空时,所有工作者都已完成。工作者们可以在发送完他们的消息后调用context.stop(self)
来终止自己。
进一步的语义将取决于工作的具体情况。例如,协调器可以设置一个定时回调到自身(例如,调用context.system.scheduler.schedule(someDelay, someDelay, self, ResendWorkOrders)
将在每个someDelay
间隔内向自身发送一个ResendWorkOrders消息)。当它收到该调用时,它可以重新发送工作请求给集合中剩余的每个Worker(甚至可以重新生成它们)。当集合为空(所有的Worker都已完成)时,可以取消调度(调度器的调用返回一个Cancellable
)。这可以处理例如,在将消息传递给Worker时可能发生失败的情况(例如,在分布式系统中),或者Worker可能在没有回复给协调器的情况下失败或出错。
val myActorRef = system.actorOf(Props(classOf[WorkerActor]), "worker")
val result = (myActorRef ? SomeComputationMessage()).mapTo[Result]
sender ! Result()
将其结果发送回调用者,则此请求模式将返回一个带有结果的 Future。