我有一个演员(名为Worker),它向3个其他演员(名为Filter1、Filter2、Filter3)发送相同的消息。
每个过滤器都有一个随机时间来解决此操作。然后,在Worker演员中,我使用ask模式并等待未来成功:
class Worker2 extends Actor with ActorLogging {
val filter1 = context.actorOf(Props[Filter1], "filter1")
val filter2 = context.actorOf(Props[Filter2], "filter2")
val filter3 = context.actorOf(Props[Filter3], "filter3")
implicit val timeout = Timeout(100.seconds)
def receive = {
case Work(t) =>
val futureF3 = (filter3 ? Work(false)).mapTo[Response]
val futureF2 = (filter2 ? Work(true)).mapTo[Response]
val futureF1 = (filter1 ? Work(true)).mapTo[Response]
val aggResult: Future[Boolean] =
for {
f3 <- futureF3
f2 <- futureF2
f1 <- futureF1
} yield f1.reponse && f2.reponse && f3.reponse
if (Await.result(aggResult, timeout.duration)) {
log.info("Response: true")
sender ! Response(true)
} else {
log.info("Response: false")
sender ! Response(false)
}
}
}
如果任何一个过滤器Actor返回false,那么我就不需要其他答案。例如,如果我并行运行3个过滤器Actor中的一个,如果在某种情况下,Filter1返回false,则问题得到解决,我不需要Filter2和Filter3的答案。
在这段代码中,我总是需要等待3个执行才能决定,这似乎是不必要的。有没有一种设置短路的方法?