Akka和并发Actor执行

3

我有一个演员(名为Worker),它向3个其他演员(名为Filter1、Filter2、Filter3)发送相同的消息。

每个过滤器都有一个随机时间来解决此操作。然后,在Worker演员中,我使用ask模式并等待未来成功:

class Worker2 extends Actor with ActorLogging {

  val filter1 = context.actorOf(Props[Filter1], "filter1")
  val filter2 = context.actorOf(Props[Filter2], "filter2")
  val filter3 = context.actorOf(Props[Filter3], "filter3")

  implicit val timeout = Timeout(100.seconds)

  def receive = {
    case Work(t) =>

      val futureF3 = (filter3 ? Work(false)).mapTo[Response]
      val futureF2 = (filter2 ? Work(true)).mapTo[Response]
      val futureF1 = (filter1 ? Work(true)).mapTo[Response]

      val aggResult: Future[Boolean] =
        for {
          f3 <- futureF3
          f2 <- futureF2
          f1 <- futureF1
        } yield f1.reponse && f2.reponse && f3.reponse

      if (Await.result(aggResult, timeout.duration)) {
        log.info("Response: true")
        sender ! Response(true)
      } else {
        log.info("Response: false")
        sender ! Response(false)
      }
  }
}

如果任何一个过滤器Actor返回false,那么我就不需要其他答案。例如,如果我并行运行3个过滤器Actor中的一个,如果在某种情况下,Filter1返回false,则问题得到解决,我不需要Filter2和Filter3的答案。
在这段代码中,我总是需要等待3个执行才能决定,这似乎是不必要的。有没有一种设置短路的方法?
2个回答

7
这个问题的解决方案是使用Future.find() -- Scaladoc Here 你可以像这样解决它:
val failed = Future.find([f1,f2,f3]) { res => !res }
Await.result(failed, timeout.duration) match {
    None => // Success
    _ => // Failed
}

Future.find()会返回第一个完成且符合谓词的future。如果所有的future都已经完成,而且没有结果符合谓词,则返回None。

编辑:

更好的解决方案是完全避免阻塞,并使用akka管道功能将结果直接传送到发送者,当找到响应时,这样就不会使用此actor阻塞线程:

import akka.pattern.pipe

val failed = Future.find([f1,f2,f3]) { res => !res }
val senderRef = sender
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)

在getOrElse(true)部分,如果我们像以前一样找到了future,则结果为false,否则我们返回true。

+1,这是一个可靠的解决方案。但请记住,其他未来任务仍将继续运行直至完成,只是您不必等待它们。 - cmbaxter
太好了!它运行得非常顺畅。对于其他的功能,我正在考虑添加一个取消任务和一个优先收件箱。我的意思是,在使用false解决后,我向所有参与者发送一个带有任务ID的取消消息。由于此消息具有最高优先级,如果未处理该消息,则将任务ID存储在Actor状态中。然后,当任务到达时,我会检查该ID。如果存在,则完全忽略它。通过这种方式,我可以避免执行昂贵的任务。 - German

1
我认为你想要的是,如果响应为true,就过滤未来。由于for表达式的工作方式,它会短路并不会等待其余的未来完成以组装响应。它仍将返回一个失败的未来,其中包含MatchError异常(参见[1]),您需要使用onFailure处理程序来处理它。
所以
val aggResult = for {
  f3 <- futureF3 if (f3.response)
  f2 <- futureF2 if (f2.response)
  f1 <- futureF1 if (f1.response)
} yield f1.reponse && f2.reponse && f3.reponse

aggResult.onFailure { case MatchError => sender ! false } 

[1] : https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ

[1]: https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ

不完全是这样...最好在for循环中的yield部分看看。如果其中一个为false,那么等待其他的就没有意义了,因为答案是false!无论如何,你的代码比我的好多了!:) - German
公平的。Bryan的解决方案要好得多。 - Ratan Sebastian

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接