Akka演员、Futures和闭包

12
我在Akka文档中读到,从封闭的actor中关闭变量是危险的。

警告

在这种情况下,您需要小心避免关闭包含actor的引用,即不要从匿名Actor类内部调用封闭actor的方法。这将破坏actor的封装性,并可能引入同步错误和竞态条件,因为其他actor的代码将与封闭actor同时调度。

现在,我有两个actors,其中一个请求第二个actors的某些内容,并对结果执行某些操作。在下面的示例中,actor Accumulator从actor NumberGenerator检索数字并将它们相加,在此过程中报告总和。

这可以通过至少两种不同的方式完成,就像这个例子展示的那样,使用两个不同的receive函数(AB)。两者之间的区别在于,A没有关闭counter变量;相反,它等待一个整数并将其求和,而B创建了一个Future,该Future关闭counter并进行求和。如果我正确理解了这是如何工作的,这发生在一个匿名actor中,该actor只是为了处理onSuccess而创建。

import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}

在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用AtomicInteger代替Int,或者在某些网络场景中使用,例如netty,在threadsafe通道上发出写操作,但这不是我的重点。

冒昧地问一句:有没有办法让Future的onSuccess在Actor中执行,而不是在匿名中间Actor中执行,而不需要receive函数中定义一个case?

编辑

更明确地说,我的问题是:是否有一种方法强制一系列Futures在给定Actor的同一线程中运行?

2个回答

5
实现这种设计的最简单方法是使用“fire-and-forget”语义:
class Accumulator extends Actor {
  private[this] var counter = 0

  def receive = {
    case Start => ActorTest.genRef ! Request
    case x: Int => {
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    }
  }
}

该解决方案是完全异步的,您不需要任何超时设置。

是的,如果我放弃使用Futures,这个方法可以工作。在我的例子中,如果Futures以“_pipeTo self_”结尾,则可以轻松地链接它们,但是在使用fire-and-forget语义时不再可能。相反,我必须在累加器的接收函数中定义N个中间消息,以确保代码在此Actor的线程中运行。我想我可以再次询问,这次更清楚:是否有一种方法可以强制一系列Futures在给定Actor的同一线程中运行? - gsimard
为什么需要保证Accumulator Actor始终在同一线程中运行?这似乎违背了Actor模型的理念。同样适用于futures:它们应该在线程池上分派,以最大化性能。如果它们都在同一个线程中按顺序运行,那么你只有一个普通的顺序程序,你就不再需要futures了... - paradigmatic
实际上,它是否在同一线程中运行并不重要,要求更多的是按顺序运行,就像单个actor的消息被处理一样。这并不违反actor模型,它 就是 actor模型。 - gsimard
您可以使用Actor链接Future:只需使用?ask)而不是!tell):您将获得一个Future,您可以在其上进行mapfold操作:http://doc.akka.io/docs/akka/2.4.0/scala/futures.html - Max Heiber

5
问题在于 onSuccess 方法将在不同的线程中运行,而与该方法相关的Actor的receive方法将在另一个线程中运行。你可以使用 pipeTo 方法或使用一个Agent来解决这个问题。将 counter 定义为AtomicInteger 可以解决这个问题,但这并不是很优雅——也就是说,它违反了 Actor 模型。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接