考虑经典的“单词计数”程序。它计算某个目录中所有文件中单词的数量。主节点接收到某个目录并将工作分配给Worker actors(每个worker处理一个文件)。这是伪代码:
class WordCountWorker extends Actor {
def receive = {
case FileToCount(fileName:String) =>
val count = countWords(fileName)
sender ! WordCount(fileName, count)
}
}
class WordCountMaster extends Actor {
def receive = {
case StartCounting(docRoot) => // sending each file to worker
val workers = createWorkers()
fileNames = scanFiles(docRoot)
sendToWorkers(fileNames, workers)
case WordCount(fileName, count) => // aggregating results
...
}
}
但我希望能够按计划运行此字数统计程序(例如每1分钟),并提供不同的目录进行扫描。
Akka为安排消息传递提供了不错的方式:
system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))
但是,以上调度程序的问题在于,当调度程序发送新消息时,但上一条消息尚未处理时(例如我发送消息以扫描某个大目录,1秒后我发送另一条消息以扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的
WordCountMaster
将从正在处理不同目录的工作人员接收到WordCount
消息。
作为解决方法,我可以安排执行某些代码块的执行,该代码块将每次创建新的WordCountMaster
。即一个目录=一个WordCountMaster
。但是我认为这是低效的,并且我还需要注意为WordCountMaster
提供唯一名称,以避免出现InvalidActorNameException
。
所以我的问题是:我应该像上面一段中提到的那样为每个tick创建新的WordCountMaster
吗?还是有更好的想法/模式如何重新设计此程序以支持调度?
一些更新: 对于每个目录创建一个主actor,我有一些问题:
- 演员命名问题
InvalidActorNameException:actor name [WordCountMaster] is not unique!
和
InvalidActorNameException:actor name [WordCountWorker] is not unique!
我可以克服这个问题,只需不提供演员名称即可。但在这种情况下,我的演员会收到自动生成的名称,例如$a
,$b
等。这对我来说不好。
- 配置问题:
我想将我的路由器配置排除在application.conf
之外。也就是说,我想为每个WordCountWorker
路由器提供相同的配置。但是,由于我无法控制演员名称,因此无法使用以下配置,因为我不知道演员名称:
/wordCountWorker{
router = smallest-mailbox-pool
nr-of-instances = 5
dispatcher = word-counter-dispatcher
}
WordCountMaster
发送两条消息,分别带有不同的目录:StartCounting(directory_1)
和StartCounting(directory_2)
,那么WordCountMaster
将会接收到来自不同目录的结果。也就是说,这些消息将包含来自不同目录的文件WordCount(fileName, count)
。 - WelcomeTo