真正伟大的介绍"The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html。
Actor 接收消息,将阻塞代码包装到 Future 中,在它的 Future.onSuccess 方法中 - 使用其他异步消息发送结果。但要注意 sender 变量可能会改变,所以要关闭它(在 future 对象中创建一个本地引用)。
p.s.:The Neophyte's Guide to Scala - 真正很棒的书。
更新:(添加示例代码)
我们有工人和经理。经理设置要完成的工作,工人报告“收到”,并开始长时间的过程(休眠1000)。同时,系统用消息“alive”向经理发送 ping,并向工人发送 ping。完成工作时 - 工人通知经理。
NB:执行 sleep 1000 在导入的“默认/全局”线程池执行器中完成 - 您可能会遇到线程饥饿。
NB:val commander = sender 需要“关闭”对原始 sender 的引用,因为当 onSuccess 将被执行时 - actor 中的当前 sender 可能已经设置为其他 'sender' ...
日志:
01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!
代码:
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global
object Sample {
private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")
def printWithTime(msg: String) = {
println(fmt.format(new Date()) + " " + msg)
}
class WorkerActor extends Actor {
protected def receive = {
case "now" =>
val commander = sender
printWithTime("worker: got command")
future {
printWithTime("worker: started")
Thread.sleep(1000)
printWithTime("worker: done")
}(ExecutionContext.Implicits.global) onSuccess {
case _ => commander ! "done"
}
commander ! "working"
case "alive?" =>
printWithTime("worker: alive")
}
}
class ManagerActor(worker: ActorRef) extends Actor {
protected def receive = {
case "do" =>
worker ! "now"
printWithTime("manager: flush sent")
case "working" =>
printWithTime("manager: resource allocated")
case "done" =>
printWithTime("manager: work is done")
case "alive?" =>
printWithTime("manager alive")
worker ! "alive?"
}
}
def main(args: Array[String]) {
val config = ConfigFactory.parseString("" +
"akka.loglevel=DEBUG\n" +
"akka.debug.lifecycle=on\n" +
"akka.debug.receive=on\n" +
"akka.debug.event-stream=on\n" +
"akka.debug.unhandled=on\n" +
""
)
val system = ActorSystem("mine", config)
val actor1 = system.actorOf(Props[WorkerActor], "worker")
val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")
actor2 ! "do"
actor2 ! "alive?"
actor2 ! "alive?"
actor2 ! "alive?"
printWithTime("Humming ...")
Thread.sleep(5000)
printWithTime("Shutdown!")
system.shutdown()
}
}