在Akka Actors中使用Futures

17
我刚开始学习使用 Scala 中的 Akka Actors。我的理解是,Actor 接收到的消息会排队在 Actor 的邮箱中,并逐个进行处理。通过逐个处理消息,可以减轻并发问题(竞争条件和死锁)。
但是如果 Actor 创建一个 future 来完成与消息相关联的工作会怎样呢?由于 future 是异步的,当与前一个消息相关联的 future 仍在运行时,Actor 可以开始处理接下来的几个消息。这难道不可能造成竞争条件吗?如何在 Actor 的 receive() 方法内安全地使用 futures 执行长时间运行的任务?

你是说与未来相关的操作作用于演员内部的变量吗?那么是的,这是一种竞态条件。 - Alexei Kaigorodov
3个回答

31

在Actor内部使用Future最安全的方式是只使用pipeTo方法,在将其结果作为消息发送给一个Actor(可能是同一个Actor)。

import akka.pattern.pipe

object MyActor {

  def doItAsynchronously(implicit ec: ExecutionContext): Future[DoItResult] = { 
  /* ... */ 
  }

}

class MyActor extends Actor {

  import MyActor._    
  import context.dispatcher  

  def receive = {
    case DoIt =>
      doItAsynchronously.pipeTo(self)
    case DoItResult =>
       // Got a result from doing it
  }

}

这可以确保您不会在演员内部改变任何状态。


3
+1。如果这个回答没有表明清楚,那么 doItAsynchronously 不应该改变 actor 的任何状态。事实上,如果你可以将该函数移动到伴生对象(或任何其他不能访问 actor 的 this 的地方),那么更容易确保不会改变状态。 - Rob Starling
虽然这个模式很有趣,但它并没有真正回答这个问题:由于未来是异步的,因此 Actor 可以在前一个消息关联的未来仍在运行时开始处理下几个消息。这样做是否可能会创建竞争条件?它只是暗示现在工作确实是并行完成的,并且可能会导致竞态条件。 - Philluminati

0

请记住以下两点:

  1. “Future”(一个特殊的Actor)背后的概念是,我们可以在(结果)正在计算、启动或结束的同时为任何结果创建一个Actor,但我们无法为该特殊Actor或Future创建地址。

  2. 假设我想买什么东西(我的结果是购买某物,而流程是启动开始购买程序的步骤),我们可以为结果(购买)创建一个Actor,但如果出现任何问题,我们将得到一个异常而不是结果。

现在,如何将上述两个问题结合起来解释如下:

假设我们需要计算10亿的阶乘,我们可能认为计算需要很长时间,但我们立即获得了Future,因此生成Future不需要时间("由于future是异步的,因此Actor在前一个消息关联的future仍在运行时可以开始处理下几个消息。")。接下来,我们可以传递、存储和分配这个Future。

希望您能理解我的意思。

来源:https://www.brianstorti.com/the-actor-model/


-1

如果你需要在future中改变状态而不阻塞传入的消息,你可能需要重新设计你的actor模型。我会为每个任务引入单独的actor,并在这些future上使用它们。毕竟,演员的主要任务是维护它的状态而不让它逃逸,从而提供安全并发性。为那些长时间运行的任务定义一个actor,其责任只是照顾它。

与手动处理状态相比,您可能希望考虑使用akka's FSM,因此您可以更清晰地了解何时发生变化。当我处理更复杂的系统时,我个人更喜欢使用这种方法,而不是丑陋的变量。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接