了解Akka Actor何时结束

5

有几个和我一起工作的人正在尝试找出处理这个问题的最佳方法。似乎这应该是一个经常需要的标准事情,但由于某些原因,我们似乎无法得到正确的答案。

如果我有一些工作要做,并向路由器发送一堆消息,那么如何知道所有工作都已完成呢?例如,如果我们正在读取一个100万行文件的行,并将该行发送给演员进行处理,而您需要处理下一个文件,但必须等待第一个文件完成,那么您如何知道它何时完成?

进一步说一下,我知道并使用了与Patters.ask()一起使用的Await.result()和Await.ready()。一个不同之处在于,每一行都会有一个Future,我们会有一个巨大的Future数组来等待,而不仅仅是一个。此外,我们正在填充大型领域模型,占用了相当多的内存,并且不希望添加额外的内存来保存等待组成的等量未来,而使用演员后,每个演员在完成工作后都不会保留内存以等待组成。

我们正在使用Java而不是Scala。

伪代码:

for(File file : files) {
    ...
    while((String line = getNextLine(fileStream)) != null) {
        router.tell(line, this.getSelf());
    }
    // we need to wait for this work to finish to do the next
    // file because it's dependent on the previous work
}

在使用 actors 时,你可能需要进行大量的工作,并且需要知道何时完成。

2个回答

4
我相信我有一个解决方案,它不需要累积大量的“Future”。首先,介绍一下高级概念。这个流程将有两个角色参与。第一个我们称之为“FilesProcessor”。这个角色将是短暂的和有状态的。每当您想要顺序处理一堆文件时,就会启动此角色的一个实例,并传递一个包含您要处理的文件名称(或路径)的消息。当它完成所有文件的处理后,它会停止自己。第二个角色我们将称之为“LineProcessor”。这个角色是无状态的,长期存在并在路由器后面进行池化。它处理一个文件行,然后回复请求该行处理已经完成的人告诉他们它已经完成了该行的处理。现在进入代码部分。
首先是消息:
public class Messages {

  public static class ProcessFiles{
    public final List<String> fileNames;
    public ProcessFiles(List<String> fileNames){
      this.fileNames = fileNames;
    }
  }

  public static class ProcessLine{
    public final String line;
    public ProcessLine(String line){
      this.line = line;
    }
  }

  public static class LineProcessed{}

  public static LineProcessed LINE_PROCESSED = new LineProcessed();
}

还有 FilesProcessor

public class FilesProcessor extends UntypedActor{
  private List<String> files;
  private int awaitingCount;
  private ActorRef router;

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessFiles){      
      ProcessFiles pf = (ProcessFiles)msg;
      router = ... //lookup router;
      files = pf.fileNames;
      processNextFile();
    }
    else if (msg instanceof LineProcessed){
      awaitingCount--;
      if (awaitingCount <= 0){
        processNextFile();
      }
    }

  }

  private void processNextFile(){
    if (files.isEmpty()) getContext().stop(getSelf());
    else{            
      String file = files.remove(0);
      BufferedReader in = openFile(file);
      String input = null;
      awaitingCount = 0;

      try{
        while((input = in.readLine()) != null){
          router.tell(new Messages.ProcessLine(input), getSelf());
          awaitingCount++;
        }        
      }
      catch(IOException e){
        e.printStackTrace();
        getContext().stop(getSelf());
      }

    }
  }

  private BufferedReader openFile(String name){
    //do whetever to load file 
    ...
  }

}

还有LineProcessor

public class LineProcessor extends UntypedActor{

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessLine){
      ProcessLine pl = (ProcessLine)msg;

      //Do whatever line processing...

      getSender().tell(Messages.LINE_PROCESSED, getSelf());
    }
  }

}

现在,行处理器正在发送响应,但没有额外的内容。如果您需要根据行的处理结果发送一些内容,可以更改此内容。我相信这段代码并不是非常完美,我只是想向您展示一个如何在没有请求/响应语义和 Future 的情况下完成此流程的高级概念。

如果您对此方法有任何疑问或需要更多详细信息,请告诉我,我很乐意提供帮助。


在您的设计中,是什么阻止了ProcessFiles一次处理多个文件?如果它一次接收到50个文件,难道不会同时开始处理所有这些文件吗?也许if(msg instanceof ProcessFiles)也应该添加if(awaitingCount <= 0)? - Steven Edison
这个设计意味着短暂的 FileProcessor 只能在停止自身之前为一个 ProcessFiles 请求提供服务。它并不打算为多个 ProcessFiles 请求提供服务。如果您有更多的请求,请启动更多此 actor 的实例。或者,您可以像您所说的那样,在接收中添加一个 if 块,查看它是否已经处理了一批文件,如果是,则可能只需将新的传入文件列表附加到正在处理的当前文件列表中。 - cmbaxter

0

在路由器上使用context.setRecieveTimeout将消息发送回发件人,并计算处理的消息数量。当总处理的消息数==已发送的数量时,即完成。

如果您的路由器足够忙,以至于setReceiveTimeout无法经常触发,则安排自己的消息以返回计数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接