如何并行处理文件中的行?

8
我想读取一个大文件,处理每一行并将结果插入到数据库中。我的目标是将行的处理并行化,因为每个进程都是长时间运行的任务。因此,我希望有一个线程继续读取,多个线程继续处理,并且一个线程保持以块的形式插入到数据库中。
我将其分解如下:
1)按顺序逐行读取文件(容易)
2)将每行发送到线程池(3个线程),因为处理是长时间运行的任务。在线程池忙碌时阻止进一步的行读取。
3)将每个线程池处理的已处理行写入StringBuffer 4)监视缓冲区大小,并以块的形式将结果写入数据库(例如每1000个条目)
ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

问题:

  • future.get() 总是会阻塞读取器,直到一个future返回结果

  • 缓冲区 "monitoring" 可能没有正确处理

我可能做得不对。但是我该如何实现呢?

附注:文件大小约为10GB,因此我无法先将整个文件读入内存以准备并行任务。


这是在暗示只有我需要使用一个有界队列吗? :| - Eugene
3个回答

2
我发现以下解决方案非常优雅。虽然有许多其他可能的解决方案,但它在概念上非常简单,具有以下特点:
  • 限制读取速度
  • 仅累积最少量的状态以在最后报告就绪
  • 不需要显式处理线程
我只在此处放置实际的测试方法,并提供完整的测试设置和辅助数据结构,可以在专用的 GitHub 存储库中找到。
private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}

那么,没有仅在队列已满时阻止主线程提交的ExecutorService吗? - membersound
据我所知,它排队了,这就是为什么对你不起作用的原因。 - Oleg Sklyar
1
@membersound 我已经更新了我的答案,使其非常简单明了,避免了线程阻塞的循环。现在真的很喜欢它。 - Oleg Sklyar

0
1. 读取文件大小(使用 File.length() 方法),并将其分割成线程的数量。
2. 使用 RandomAccessFile 在 @1 处找到的索引之前搜索任何换行符号。https://docs.oracle.com/javase/7/docs/api/java/io/RandomAccessFile.html 3. 将新的索引/偏移量和具有读取权限的 RandomAccessFile 发送给每个线程。
4. 子类化 InputStream 来创建一个基于 RandomAccessFile 的新 InputStream,并开始读取。

0
经过深入研究,我发现这个答案中提到的BlockingExecutor最接近我想要实现的目标:

https://stackoverflow.com/a/43109689/1194415

它基本上是将ThreadPoolExecutorSemaphore锁结合在一起扩展的。

1
很酷,这个想法似乎与我上次的努力类似 :) 很高兴你找到了解决方案! - Oleg Sklyar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接