我想读取一个大文件,处理每一行并将结果插入到数据库中。我的目标是将行的处理并行化,因为每个进程都是长时间运行的任务。因此,我希望有一个线程继续读取,多个线程继续处理,并且一个线程保持以块的形式插入到数据库中。
我将其分解如下:
1)按顺序逐行读取文件(容易)
2)将每行发送到线程池(3个线程),因为处理是长时间运行的任务。在线程池忙碌时阻止进一步的行读取。
3)将每个线程池处理的已处理行写入StringBuffer 4)监视缓冲区大小,并以块的形式将结果写入数据库(例如每1000个条目)
我将其分解如下:
1)按顺序逐行读取文件(容易)
2)将每行发送到线程池(3个线程),因为处理是长时间运行的任务。在线程池忙碌时阻止进一步的行读取。
3)将每个线程池处理的已处理行写入StringBuffer 4)监视缓冲区大小,并以块的形式将结果写入数据库(例如每1000个条目)
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}
问题:
future.get()
总是会阻塞读取器,直到一个future返回结果缓冲区 "monitoring" 可能没有正确处理
我可能做得不对。但是我该如何实现呢?
附注:文件大小约为10GB,因此我无法先将整个文件读入内存以准备并行任务。