使用Akka的Actors进行异步消息处理

3
在我的项目中,我使用Akka的Actors。根据定义,Actors是线程安全的,这意味着在Actor的receive方法中,它们可以处理多个消息而不会出现竞争条件。
def receive = {
    case msg =>
        // some logic here
}

每次只有一个线程处理被注释的代码块。然而,当该代码为异步时,情况开始变得更加复杂:

def receive = {
    case msg =>
        Future {
            // some logic here
        }
}

如果我理解正确的话,在这种情况下只有Future构造将被同步,而不是Future内部的逻辑。
当然,我可以阻止Future:
def receive = {
    case msg =>
        val future = Future {
            // some logic here
        }
        Await.result(future, 10.seconds)
}

这个方案虽然解决了问题,但我认为我们都应该达成一个共识:这几乎不是一个可以接受的解决方案。

因此我的问题是:如何在异步计算的情况下保持Actor的线程安全性,而不会阻塞Scala的Futures?


只需按照http://docs.scala-lang.org/overviews/core/futures.html中所述的方式,将回调添加到Future中。回调应该向此或另一个actor发送消息。 - Alexei Kaigorodov
3个回答

6
如何在异步计算中保持Actor的线程安全性而不阻塞Scala的Future?
如果你在 Future 内部修改 Actor 的内部状态,那么这个假设只在第一次看起来是正确的,其实是设计上的问题。将数据复制并通过 `pipeTo` 将计算结果传递给 Actor,仅使用 Future 进行计算。一旦 Actor 接收到计算结果,就可以安全地对其进行操作。
import akka.pattern.pipe

case class ComputationResult(s: String)

def receive = {
  case ComputationResult(s) => // modify internal state here
  case msg =>
    Future {
       // Compute here, don't modify state
       ComputationResult("finished computing")
    }.pipeTo(self)
}

实际上,Future内部的逻辑涉及Mongo数据库。我们正在使用ReactiveMongo,它是异步的(因此出现了所描述的问题)。因此,我必须确保下一条消息在前一条消息被处理后才被处理,这意味着对数据库的异步调用应该已经被解决。 - Sergey Volkov
1
如果这是问题的话,那么这与Akka actors无关,而是关于允许你的数据库在处理下一个事务之前完成当前事务。在这种情况下,你可以实现一个阻塞队列逻辑,只有在完成后才会处理队列中的下一项。 - Yuval Itzchakov

0

这里最简单的解决方案是将演员转换为状态机(使用AkkaFSM),并执行以下操作:

  • 为mongoDB请求分派一个future。
  • 使用自己的演员引用与自己的演员通信。
  • 将来自future的消息告诉回去。

根据上下文,您可能需要做更多工作才能获得适当的响应。

但这样做的好处是您可以使用演员状态处理消息,并且可以随意更改演员状态,因为您拥有该线程。


0

我认为你需要先“解析”数据库查询,然后使用结果返回一个新的Future。如果数据库查询返回一个Future[A],那么你可以使用flatMap操作A并返回一个新的Future。大致如下:

def receive = {
    case msg =>
        val futureResult: Future[Result] = ...
        futureResult.flatMap { result: Result =>
            // ....
            // return a new Future
        }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接