如何检测Akka演员(actors)何时完成?

4
我正在尝试实现一个简单的应用程序,其中我将不同的任务分配给Akka演员,并让它们独立地计算结果。问题在于如何检测它们何时完成了分配的工作(成功或失败次数),因为我需要从它们那里获得计算结果。一般采用什么方法?
我尝试使用system.shutdown()来等待所有演员完成,但是在此命令之后,演员已经终止并且无法响应检索计算结果所需的任何消息。
我想到的另一件事是向工作人员演员发送像JobCompleted这样的消息,然后可以计算我收到多少这种类型的消息,如果计数等于生成的工作者演员,则知道它们全部完成。但是我不知道演员内部发生故障时会发生什么。此方法还显得太笨重。

1
可能是 Knowing when akka actors are finished 的重复问题。 - Daenyth
你应该使用“ask”模式来回复,使用父级actor本身观察子级actor或使用“Reaper”模式。 - Johny T Koshy
3个回答

2

0

我会有一个协调员Actor,它会生成工作者并向每个工作者发送指定的工作消息,构建一个包含每个工作者ActorRefs的集合。当在协调员中接收到JobCompleted(或适当时的JobFailed)消息时,累积结果并从集合中删除该消息的发送者(即Worker的ActorRef)。当集合为空时,所有工作者都已完成。工作者们可以在发送完他们的消息后调用context.stop(self)来终止自己。

进一步的语义将取决于工作的具体情况。例如,协调器可以设置一个定时回调到自身(例如,调用context.system.scheduler.schedule(someDelay, someDelay, self, ResendWorkOrders)将在每个someDelay间隔内向自身发送一个ResendWorkOrders消息)。当它收到该调用时,它可以重新发送工作请求给集合中剩余的每个Worker(甚至可以重新生成它们)。当集合为空(所有的Worker都已完成)时,可以取消调度(调度器的调用返回一个Cancellable)。这可以处理例如,在将消息传递给Worker时可能发生失败的情况(例如,在分布式系统中),或者Worker可能在没有回复给协调器的情况下失败或出错。


0
当您创建“jobs”并将它们发送给工作人员时,可以使用ask模式创建结果的Future。
这看起来可能是这样的:
val myActorRef = system.actorOf(Props(classOf[WorkerActor]), "worker")

val result = (myActorRef ? SomeComputationMessage()).mapTo[Result]

如果演员使用 sender ! Result() 将其结果发送回调用者,则此请求模式将返回一个带有结果的 Future。
由于这些是结果的 Futures,您可以对它们进行映射,并根据成功/失败适当地处理它们。

http://doc.akka.io/api/akka/2.0/akka/pattern/package.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接