我相信我有一个解决方案,它不需要累积大量的“Future”。首先,介绍一下高级概念。这个流程将有两个角色参与。第一个我们称之为“FilesProcessor”。这个角色将是短暂的和有状态的。每当您想要顺序处理一堆文件时,就会启动此角色的一个实例,并传递一个包含您要处理的文件名称(或路径)的消息。当它完成所有文件的处理后,它会停止自己。第二个角色我们将称之为“LineProcessor”。这个角色是无状态的,长期存在并在路由器后面进行池化。它处理一个文件行,然后回复请求该行处理已经完成的人告诉他们它已经完成了该行的处理。现在进入代码部分。
首先是消息:
public class Messages {
public static class ProcessFiles{
public final List<String> fileNames;
public ProcessFiles(List<String> fileNames){
this.fileNames = fileNames;
}
}
public static class ProcessLine{
public final String line;
public ProcessLine(String line){
this.line = line;
}
}
public static class LineProcessed{}
public static LineProcessed LINE_PROCESSED = new LineProcessed();
}
还有 FilesProcessor
:
public class FilesProcessor extends UntypedActor{
private List<String> files;
private int awaitingCount;
private ActorRef router;
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessFiles){
ProcessFiles pf = (ProcessFiles)msg;
router = ...
files = pf.fileNames;
processNextFile();
}
else if (msg instanceof LineProcessed){
awaitingCount--;
if (awaitingCount <= 0){
processNextFile();
}
}
}
private void processNextFile(){
if (files.isEmpty()) getContext().stop(getSelf());
else{
String file = files.remove(0);
BufferedReader in = openFile(file);
String input = null;
awaitingCount = 0;
try{
while((input = in.readLine()) != null){
router.tell(new Messages.ProcessLine(input), getSelf());
awaitingCount++;
}
}
catch(IOException e){
e.printStackTrace();
getContext().stop(getSelf());
}
}
}
private BufferedReader openFile(String name){
...
}
}
还有LineProcessor
:
public class LineProcessor extends UntypedActor{
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessLine){
ProcessLine pl = (ProcessLine)msg;
getSender().tell(Messages.LINE_PROCESSED, getSelf());
}
}
}
现在,行处理器正在发送响应,但没有额外的内容。如果您需要根据行的处理结果发送一些内容,可以更改此内容。我相信这段代码并不是非常完美,我只是想向您展示一个如何在没有请求/响应语义和 Future
的情况下完成此流程的高级概念。
如果您对此方法有任何疑问或需要更多详细信息,请告诉我,我很乐意提供帮助。
FileProcessor
只能在停止自身之前为一个ProcessFiles
请求提供服务。它并不打算为多个ProcessFiles
请求提供服务。如果您有更多的请求,请启动更多此 actor 的实例。或者,您可以像您所说的那样,在接收中添加一个 if 块,查看它是否已经处理了一批文件,如果是,则可能只需将新的传入文件列表附加到正在处理的当前文件列表中。 - cmbaxter