按需获取Actor或创建Actor

23

我可以使用actorOf创建演员,并使用actorFor查找它们。现在,我想通过一些id:String获取一个演员,如果它不存在,则希望创建它。像这样:

  def getRCActor(id: String):ActorRef = {
    Logger.info("getting actor %s".format(id))
    var a = system.actorFor(id)
    if(a.isTerminated){
      Logger.info("actor is terminated, creating new one")
      return system.actorOf(Props[RC], id:String)
    }else{
      return a
    }
   }

但是这并不起作用,因为isTerminated总是为真,我会得到第二次调用时的actor name 1 is not unique!异常。我猜我在这里使用了错误的模式。有人能帮忙解决如何实现吗?我需要:

  • 按需创建演员
  • 按ID查找演员,如果不存在则创建演员
  • 能够销毁演员,因为我不知道是否还需要它们

我应该使用Dispatcher还是Router来实现呢?

解决方案 正如所建议的那样,我使用具体的Supervisor来保存可用的演员映射。可以要求它提供其中一个子级。

class RCSupervisor extends Actor {

  implicit val timeout = Timeout(1 second)
  var as = Map.empty[String, ActorRef]

  def getRCActor(id: String) = as get id getOrElse {
    val c = context actorOf Props[RC]
    as += id -> c
    context watch c
    Logger.info("created actor")
    c
  }

  def receive = {

    case Find(id) => {
      sender ! getRCActor(id)
    }

    case Terminated(ref) => {
      Logger.info("actor terminated")
      as = as filterNot { case (_, v) => v == ref }
    }
  }
}

他的伴生对象

object RCSupervisor {

  // this is specific to Playframework (Play's default actor system)
  var supervisor = Akka.system.actorOf(Props[RCSupervisor])

  implicit val timeout = Timeout(1 second)

  def findA(id: String): ActorRef = {
    val f = (supervisor ? Find(id))
    Await.result(f, timeout.duration).asInstanceOf[ActorRef]
  }
  ...
}
4个回答

14

我使用 Akka 不是很久,但演员的 创建者 默认情况下是它们的 监督者。因此,父级可以监听它们的终止;

var as = Map.empty[String, ActorRef] 
def getRCActor(id: String) = as get id getOrElse {
  val c = context actorOf Props[RC]
  as += id -> c
  context watch c
  c
}

但显然,您需要注意它们的终止;

def receive = {
  case Terminated(ref) => as = as filterNot { case (_, v) => v == ref }

那是一个解决方案吗?我必须说我并没有完全理解你所说的 "已终止始终为真 => 演员名称1不唯一!" 的意思。


是的,这是一个解决方案。我试图避免创建自己的注册表(Map),并使用Actor Path来实现。但是我计划实现一个监管者,似乎没有其他方法。一些背景:我正在使用Playframework和他们提供的上下文。 - martin
谢谢你的回答。我从来没有想到父级演员会看到“终止”消息。这完全合理,但是我一直纠结于必须在子级上进行“监视”以处理终止(这样做没多大意义)... - jxstanford
看起来是一个可以的解决方案,但我不太喜欢在并发处理中使用varmutable.Map是一个稍微好一点的选择吗? - Ashesh

13

只能由它们的父级创建Actor。根据您的描述,我认为您正在尝试让系统创建非顶级Actor,这将始终失败。您应该向父级发送一个消息,说“把那个子代给我”,然后父级可以检查它当前是否存在,是否处于良好状态等,可能创建一个新对象,然后用适当的结果消息进行回复。

再次强调这一极其重要的点:get-or-create只能由直接父级完成。


你是对的。我选择了 oxbow_lakes 的答案,因为他提供了一些代码示例。请看我在那里的评论。谢谢。 - martin

2
我基于oxbow_lakes的代码/建议解决了这个问题,但是我没有创建所有子级演员的简单集合,而是使用了一个(双向)映射,如果子级演员的数量很大,则可能会有益处。
import play.api._
import akka.actor._
import scala.collection.mutable.Map 

trait ResponsibleActor[K] extends Actor {
  val keyActorRefMap: Map[K, ActorRef] = Map[K, ActorRef]()
  val actorRefKeyMap: Map[ActorRef, K] = Map[ActorRef, K]()

  def getOrCreateActor(key: K, props: => Props, name: => String): ActorRef = {
    keyActorRefMap get key match {
      case Some(ar) => ar
      case None =>  {
        val newRef: ActorRef = context.actorOf(props, name)
        //newRef shouldn't be present in the map already (if the key is different)
        actorRefKeyMap get newRef match{
          case Some(x) => throw new Exception{}
          case None =>
        }
        keyActorRefMap += Tuple2(key, newRef)
        actorRefKeyMap += Tuple2(newRef, key)
        newRef
      }
    }
  }

  def getOrCreateActorSimple(key: K, props: => Props): ActorRef = getOrCreateActor(key, props, key.toString)

  /**
   * method analogous to Actor's receive. Any subclasses should implement this method to handle all messages
   * except for the Terminate(ref) message passed from children
   */
  def responsibleReceive: Receive

  def receive: Receive = {
    case Terminated(ref) => {
      //removing both key and actor ref from both maps
      val pr: Option[Tuple2[K, ActorRef]] = for{
        key <- actorRefKeyMap.get(ref)
        reref <- keyActorRefMap.get(key)
      } yield (key, reref)

      pr match {
        case None => //error
        case Some((key, reref)) => {
          actorRefKeyMap -= ref
          keyActorRefMap -= key
        }
      }
    }
    case sth => responsibleReceive(sth)
  }
}

要使用此功能,您需要继承自ResponsibleActor并实现responsibleReceive方法。注意:此代码尚未经过彻底测试,可能仍存在一些问题。为了提高可读性,我省略了一些错误处理。


0

目前,您可以在Akka中使用Guice依赖注入,具体方法请参考http://www.lightbend.com/activator/template/activator-akka-scala-guice。您需要为Actor创建一个相应的模块,在其configure方法中,您需要创建一个命名绑定到Actor类和一些属性。这些属性可以来自配置文件,例如,为Actor配置路由器。您也可以通过编程方式将路由器配置放在其中。无论何时需要引用Actor,您都可以使用@Named("actorname")进行注入。配置的路由器将在需要时创建Actor实例。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接