我在Akka文档中读到,从封闭的actor中关闭变量是危险的。
现在,我有两个actors,其中一个请求第二个actors的某些内容,并对结果执行某些操作。在下面的示例中,actor Accumulator从actor NumberGenerator检索数字并将它们相加,在此过程中报告总和。警告
在这种情况下,您需要小心避免关闭包含actor的引用,即不要从匿名Actor类内部调用封闭actor的方法。这将破坏actor的封装性,并可能引入同步错误和竞态条件,因为其他actor的代码将与封闭actor同时调度。
这可以通过至少两种不同的方式完成,就像这个例子展示的那样,使用两个不同的receive函数(A与B)。两者之间的区别在于,A没有关闭counter变量;相反,它等待一个整数并将其求和,而B创建了一个Future,该Future关闭counter并进行求和。如果我正确理解了这是如何工作的,这发生在一个匿名actor中,该actor只是为了处理onSuccess而创建。
import com.esotericsoftware.minlog.Log
import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._
case object Start
case object Request
object ActorTest {
var wake = 0
val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")
Log.info("ActorTest", "Starting !")
accRef ! Start
}
class Accumulator extends Actor {
var counter = 0
implicit val timeout = Timeout(5 seconds)
// A: WITHOUT CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
// B: WITH CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
}
}
class NumberGenerator extends Actor {
val rand = new java.util.Random()
def receive = {
case Request => sender ! rand.nextInt(11)-5
}
}
在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用AtomicInteger代替Int,或者在某些网络场景中使用,例如netty,在threadsafe通道上发出写操作,但这不是我的重点。
冒昧地问一句:有没有办法让Future的onSuccess在此Actor中执行,而不是在匿名中间Actor中执行,而不需要在receive函数中定义一个case?
编辑
更明确地说,我的问题是:是否有一种方法强制一系列Futures在给定Actor的同一线程中运行?
Accumulator
Actor始终在同一线程中运行?这似乎违背了Actor模型的理念。同样适用于futures:它们应该在线程池上分派,以最大化性能。如果它们都在同一个线程中按顺序运行,那么你只有一个普通的顺序程序,你就不再需要futures了... - paradigmaticFuture
:只需使用?
(ask
)而不是!
(tell
):您将获得一个Future
,您可以在其上进行map
和fold
操作:http://doc.akka.io/docs/akka/2.4.0/scala/futures.html - Max Heiber