Akka调度模式

5

考虑经典的“单词计数”程序。它计算某个目录中所有文件中单词的数量。主节点接收到某个目录并将工作分配给Worker actors(每个worker处理一个文件)。这是伪代码:

class WordCountWorker extends Actor {

  def receive = {
    case FileToCount(fileName:String) =>
      val count = countWords(fileName)
      sender ! WordCount(fileName, count)
  }
}

class WordCountMaster extends Actor {
  def receive = {
    case StartCounting(docRoot) => // sending each file to worker
      val workers = createWorkers()
      fileNames = scanFiles(docRoot)
      sendToWorkers(fileNames, workers)
    case WordCount(fileName, count) => // aggregating results
      ...

  }
}

但我希望能够按计划运行此字数统计程序(例如每1分钟),并提供不同的目录进行扫描。

Akka为安排消息传递提供了不错的方式:

system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))

但是,以上调度程序的问题在于,当调度程序发送新消息时,但上一条消息尚未处理时(例如我发送消息以扫描某个大目录,1秒后我发送另一条消息以扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的WordCountMaster将从正在处理不同目录的工作人员接收到WordCount消息。

作为解决方法,我可以安排执行某些代码块的执行,该代码块将每次创建新的WordCountMaster。即一个目录=一个WordCountMaster。但是我认为这是低效的,并且我还需要注意为WordCountMaster提供唯一名称,以避免出现InvalidActorNameException

所以我的问题是:我应该像上面一段中提到的那样为每个tick创建新的WordCountMaster吗?还是有更好的想法/模式如何重新设计此程序以支持调度?


一些更新: 对于每个目录创建一个主actor,我有一些问题:

  1. 演员命名问题

InvalidActorNameException:actor name [WordCountMaster] is not unique!

InvalidActorNameException:actor name [WordCountWorker] is not unique!

我可以克服这个问题,只需不提供演员名称即可。但在这种情况下,我的演员会收到自动生成的名称,例如$a$b等。这对我来说不好。

  1. 配置问题:

我想将我的路由器配置排除在application.conf之外。也就是说,我想为每个WordCountWorker路由器提供相同的配置。但是,由于我无法控制演员名称,因此无法使用以下配置,因为我不知道演员名称:

  /wordCountWorker{
    router = smallest-mailbox-pool
    nr-of-instances = 5
    dispatcher = word-counter-dispatcher
  }

你能详细说明一下为什么接收旧的WordCount消息是个问题吗? - kybernetikos
如果我立即向我的WordCountMaster发送两条消息,分别带有不同的目录:StartCounting(directory_1)StartCounting(directory_2),那么WordCountMaster将会接收到来自不同目录的结果。也就是说,这些消息将包含来自不同目录的文件WordCount(fileName, count) - WelcomeTo
是的,但我想知道为什么这是一个问题。你不能从文件名中识别出目录吗? - kybernetikos
你能不能在演员名称中加入时间/时间步长? - Hadi
4个回答

4
我并不是Akka专家,但我认为每个聚合物都有一个actor的方法并不低效。您需要以某种方式将并发聚合分开。您可以为每个聚合分配一个id,在唯一的主actor中按id将它们分隔开,或者您可以使用Akka actor命名和生命周期逻辑,并将每个聚合委派给一个actor用于每个计数循环,该actor仅用于该聚合逻辑。
对我而言,每个聚合使用一个actor似乎更加优雅。
另请注意,Akka已经实现了如此处所述的聚合模式。

谢谢您的回复,我决定按照您的建议为每个聚合创建一个主actor。但是,您能否更深入地解释一下如何将聚合模式应用于WordCount程序?我阅读了这篇文章,但不清楚如何在我的情况下使用聚合模式。谢谢。 - WelcomeTo
正如所说,我不是专家,因此请谨慎考虑我的建议 :) 使用聚合模式,您可以在聚合主节点上迭代文件并创建工作节点。每个工作节点都会向主节点发送消息“WordCount”。在主节点中,您可以像这样拥有val handle = expect { case WordCount => hereYouProcessTheDataFromOneWorker; if (allWorkersAnswered) unexpect(handle); case TimeOut => handleTimeOut; unexpect(handle) }。请查看答案中链接中的示例。 - Gregor Raýman

3

个人而言,我不会使用演员来解决这个聚合问题,但无论如何,我们来试试。

我认为按照你的建议,没有合理的方法可以同时处理多个目录的单词计数。相反,您应该拥有一个“主-主”演员来监督计数器。因此,您需要三个演员类:

  • FileCounter:它接收要读取的文件并处理它。完成后,它将向发送者发送结果。
  • CounterSupervisor:它跟踪哪些FileCounter已经完成了任务,并将结果发送回WordCountForker。
  • WordCountForker:此演员将跟踪哪个子系统完成了其任务,如果它们都忙碌,则创建新的CounterSupervisor来处理问题。

文件计数器应该是最容易编写的。

class FileCounter() extends Actor with ActorLogging {

    import context.dispatcher

    override def preStart = {
        log.info("FileCounter Actor initialized")
    }

    def receive = {
        case CountFile(file) =>
            log.info("Counting file: " + file.getAbsolutePath)

            FileIO.readFile(file).foreach { data =>
                val words = data
                    .split("\n")
                    .map { _.split(" ").length }
                    .sum

                context.parent ! FileCount(words)
            }
    }
}

现在有一个负责监督文件计数器的演员。
class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {

    var total = 0
    var files: Array[File] = _
    var pendingActors = 0

    override def preStart = {
        for(i <- 1 to actorPool)
            context.actorOf(FileCounter.props(), name = s"counter$i")
    }

    def receive = {
        case CountDirectory(base) =>
            log.info("Now counting starting from directory : " + base.getAbsolutePath)
            total = 0
            files = FileIO.getAllFiles(base)
            pendingActors = 0
            for(i <- 1 to actorPool if(i < files.length)) {
                pendingActors += 1
                context.child(s"counter$i").get ! CountFile(files.head)
                files = files.tail
            }

        case FileCount(count) =>
            total += count
            pendingActors -= 1
            if(files.length > 0) {
                sender() ! CountFile(files.head)
                files = files.tail
                pendingActors += 1
            } else if(pendingActors == 0) {
                context.parent ! WordCountTotal(total)
            }
    }
}

然后是监督督导员的演员。
class WordCountForker(counterActors: Int) extends Actor with ActorLogging {

    var busyActors: List[(ActorRef, ActorRef)] = Nil
    var idleActors: List[ActorRef] = _

    override def preStart = {
        val first = context.actorOf(CounterSupervisor.props(counterActors))
        idleActors = List(first)
        log.info(s"Initialized first supervisor with $counterActors file counters.")
    }

    def receive = {
        case msg @ CountDirectory(dir) =>
            log.info("Count directory received")
            val counter = idleActors match {
                case Nil =>
                    context.actorOf(CounterSupervisor.props(counterActors))
                case head :: rest =>
                    idleActors = rest
                    head
            }
            counter ! msg
            busyActors = (counter, sender()) :: busyActors

        case msg @ WordCountTotal(n) =>
            val path = sender().path.toString()
            val index = busyActors.indexWhere { _._1.path.toString == path }
            val (counter, replyTo) = busyActors(index)
            replyTo ! msg
            idleActors = counter :: idleActors
            busyActors = busyActors.patch(index, Nil, 1)
    }
}

我在回答中省略了一些部分,以尽可能简洁,如果你想查看其余代码,请参见我发布的Gist
此外,关于你对效率的担忧,这里的解决方案将防止每个目录有一个子系统,但如果需要,仍会产生多个子系统。

2

在工作人员中,您应该使用become/unbecome功能。如果您的工作人员开始扫描大文件夹,请使用become更改演员行为,忽略另一个消息(或不处理它的响应),在目录扫描后将带有单词计数的消息发送回来,并使用unbecome恢复标准行为。


1
首先,解决命名问题:动态地、独特地给你的演员进行命名,例如:
WorkerActor + "-" + filename ...或者... MasterActor + "-" + directoryName
我是否遗漏了什么?
其次,为什么要调度?在第一个目录完成后开始处理下一个目录不是更合理吗?如果调度是必需的,那么我看到了许多不同的解决方案,并尝试解决其中一些:
1.
三级层次结构:
MasterActor -> DirectoryActor -> WorkerActor
为每个新目录创建一个新的目录演员和一个新的文件工作者。
2.
两级层次结构:
MasterActor -> WorkerActor
为每个文件创建一个新的工作者。
两种标识接收到的结果的选项:
a) 通过询问向工作者分配工作并通过 futures 聚合结果
b) 在作业中包含消息 ID(例如目录名称)
3.
带有负载平衡的两级层次结构:
与选项 2 相同,但您不为每个文件创建新的工作者,而是具有平衡调度程序或最小邮箱路由的固定数量的工作者。

4.
一级层次结构与futures:
主要的执行者没有子级,他只使用futures进行工作和聚合结果。

我还建议阅读Gregor Raýman在他的答案中提到的Akka聚合模式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接