我该如何设置Akka Actor容错?

8
我正在尝试在Akka Actors中实现容错行为。我正在处理一些依赖于系统中Actor长时间运行的代码。我发现我的处理停止了几个小时(应该需要大约10个小时),并且没有太多的进展。我认为我的Actors无法从异常中恢复。
我需要做什么才能永久地使Actors按照一对一的方式重新启动? 我期望可以从这份文档http://akka.io/docs/akka/1.1.3/scala/fault-tolerance中完成此操作。
我正在使用akka 1.1.3和scala 2.9。
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

这段代码在负载均衡器后设置了5个Actor,这些Actor会打印发送给它们的整数,但是如果收到偶数则会抛出异常以模拟故障。将整数0至200发送给这些Actor。我期望奇数会被输出,但在几个偶数故障后,所有内容似乎都停止了。在使用sbt运行此代码时,输出如下:

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

我认为这里发生的情况是5个演员开始表演,前五个偶数将它们淘汰并且它们无法重新开始。
如何更改此代码以使演员从异常中恢复?
我预计这实际上会打印出1到200之间所有的奇数。我认为每个演员都会在偶数上失败,但在异常时会被重启并保留其邮箱。我期望看到preRestart和postRestart的println。在此代码示例中需要进行哪些配置才能实现这些内容?
以下是关于akka和演员的其他假设,这可能导致我的误解。我假设可以使用监管者或故障处理程序对演员进行配置,以便在接收期间抛出异常时重新启动并继续可用。如果演员在接收期间抛出异常,则我假设发送给演员的消息将丢失。我假设调用抛出异常的演员的preRestart()和postRestart()方法。
代码示例代表了我正在尝试做的事情,并基于Why is my Dispatching on Actors scaled down in Akka?

**另一个代码示例**

这是另一个更简单的代码示例。我正在启动一个演员,该演员在偶数时抛出异常,没有负载均衡器或其他东西阻碍。我尝试打印有关演员的信息。在消息发送到演员后,我等待一分钟退出程序并监视发生了什么。

我希望这将打印出奇数,但看起来演员坐在其邮箱中的消息周围。

我设置了OneForOneStrategy吗?我需要将Actor链接到某些内容吗?我的配置方式基本上是错的吗?是否需要设置具有容错能力的调度程序?我会弄乱调度程序中的线程吗?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

我得到的输出如下:
[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
2个回答

5
问题出在我的 akka.conf 文件上。我使用的是参考 1.1.3 版本的 akka.conf 文件,除了配置事件处理程序的那一行。 我的(有问题的)如下:
    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

参考文献 1.1.3(可用的那一个):

    event-handlers = ["akka.event.EventHandler$DefaultListener"]

通过我的事件处理程序配置行,演员重启不会发生。而使用1.1.3版本的参考线路则能够很好地实现重启。
我根据这些说明进行了更改:http://akka.io/docs/akka/1.1.3/general/slf4j.html 因此,通过摆脱该页面中的建议并返回1.1.3版本的akka.conf,我能够获得容错的演员。

1

我相信你的问题是在消息发送后终止了,你没有尝试保持异步应用程序的活动状态,因此主线程退出并将所有内容关闭。


如果我在main()的末尾添加Thread.sleep(100000),则会得到以下输出:[info] Running FaultTest starting FaultTest.main() Loading config [akka.conf] from the application classpath. Number of Actors: 5 Actor: 0 Received: 1 Actor: 4 Received: 3 Actor: 1 Received: 7 Actor: 1 Received: 9,输出会暂停,但是其他数字不会打印。我没有等待应用程序退出,但是30-40秒后什么也没有发生。此外,如果我删除故障,则数字将非常快地打印出来,在不到2秒的时间内完成。 - Brian C.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接