Akka Actors - 创建Actor池

3

我用Scala创建了以下Akka Actor代码。当创建单个workerActor时,代码运行良好。但是,当我尝试使用轮询逻辑创建一组worker actor时,代码会悄悄地失败。有什么办法可以解决这个问题吗?如何打印更多的调试信息?

import scala.collection.immutable.Map
import scala.collection.mutable.ArrayBuffer

import akka.actor.actorRef2Scala
import akka.actor.ActorSystem
import akka.actor.Props
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask
import akka.util.Timeout
import akka.actor._
import org.junit._
import org.junit.Assert._
import messaging.actors._
import akka.routing.RoundRobinRouter
import akka.routing._

class MainEngineActorTest {

  @Test
  def testMainActor () = {
  val _system = ActorSystem("MainEngineActor")
  val master = _system.actorOf(Props[MainEngineActor], name = "EngineActor")

  println ("Created Main Engine Actor")


  implicit val timeout = Timeout(5 seconds)

  val userID = new UserID ("test1")

  println ("Sending messages")

  for (i <- ( 1 to 10)) {
      master ! "Hello"
      master ! "World"
  }

  }
}

class MainEngineActor extends Actor with ActorLogging{

  // works if we create only a single workerActor
  //val workerActors = context.actorOf(Props[WorkerActor], name = "WorkerActors")

  // Doesn't work when we create a pool of worker actors - how do we fix this? 
  // why doesn't this work and why aren't any error messages printed?
  val workerActors = context.actorOf(RoundRobinPool(5).props(Props[WorkerActor]), name = "WorkerActors")

   def receive: Receive = {     
     case request => {       
       workerActors forward request
     }    
  }
}  

class WorkerActor extends Actor {

   def receive: Receive = {               
     case request => {
       println ("RequestReceived =" + request)
     }
   }
}

“静默失败”是什么意思?你有任何输出吗? - Ryan
我期望这些actor打印出"RequestReceived=World RequestReceived=Hello",但是它们没有输出任何内容。然而,如果我只创建一个actor(而不是一个actor pool),一切都可以正常工作。 - user3482479
如果您发布使用的Akka版本,将会很有帮助。自Akka 2.3以来,路由器初始化语法已更改。 - pushy
1个回答

5

试着用以下方式创建你的池:

val workerActors = context.actorOf(Props[WorkerActor].withRouter(RoundRobinPool(5)), name = "WorkerActors")

此外,当将此程序作为Junit测试运行时,程序在子actor有机会接收消息之前就已经终止了。我通过在将HelloWorld消息发送到master的循环后添加Thread.sleep(5000)来验证这一点。然后,我稍微调整了您的代码以使用Akka TestKit中的 TestActorRef ,这将强制所有内容都使用CallingThreadDispatcher以实现测试期间的同步执行,这样一切都能按预期运行。我更改的两行代码如下:
implicit val _system = ActorSystem("MainEngineActor")
val master = TestActorRef(new MainEngineActor())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接