Java 8: 并行流等待所有线程完成任务

5

使用并行流处理大量带有大量数据的文件,并将它们写入特定格式。以下是代码:

public static void main(String[] args) throws Exception {
   mergeController.compactFiles();
   mergeController.writeMergedFlag();
}
private void compactFiles() {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration));
}

这些方法并不返回任何东西,它们只是在写入文件。我发现writeMergedFlag()通常在运行过程中1.5小时后被调用。

这里有什么问题吗?是堆空间问题还是其他问题?我以前没有遇到过这种问题。


2
有什么问题/你的期望是什么? - Jasper Huzen
期望的是并行流在继续执行下一条语句之前完成所有文件的写入。 - Himanshu Yadav
1
@HimanshuYadav 这就是发生的事情,也是为什么“下一条语句”(调用writeMergedFlag())要等到1.5小时后才会执行。由于您期望它等待,并且它确实在等待,因此不清楚您认为问题是什么。 - Andreas
2个回答

4

我认为这是因为并行流使用了一个固定数量线程的 ForkJoinPool。如果这些 writeEvent 任务很小,我建议使用缓存线程池:

public static void main(String[] args) throws Exception {
    mergeController.compactFiles();
    mergeController.writeMergedFlag();
}

private void compactFiles() {
    Set<String> events = this.listSubDirectoryNames(inputDir);
    ExecutorService service = Executors.newCachedThreadPool();
    events.forEach(event -> service.execute(() -> writeEvent(event, eventSchemaMap.get(event), configuration)));
    service.shutdown();
    service.awaitTermination(1, TimeUnit.DAYS); // Arbitrary value
}

这非常有趣。 是的,这个实用程序正在为总共2TB的较小源文件编写1.5GB合并文件。 - Himanshu Yadav
@HimanshuYadav 如果那个1.5 GB的文件正在加载到内存中,那么一定要增加堆大小,这样JVM就不必动态分配它了。 - Jacob G.
源文件大小很小,例如几KB。该工具会一直读取数据直到达到1.5GB,然后将其写入磁盘。我确保在下一次迭代之前关闭资源。是否有一种方法可以根据同时运行的线程数量来计算堆大小? - Himanshu Yadav
不确定,但将初始堆大小设置为1.5 GB应该可以解决问题。 - Jacob G.

1
所有JVM中的parallelStream()默认使用相同的ForkJoinPool.commonPool(),该池具有numberOfCPUs-1个工作线程。因此,在您的情况下,您需要首先使用分析工具检查哪些部分消耗时间,如果只是要处理许多文件,则可以为并行流使用自定义线程池。
private void compactFiles() throws Exception {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  ForkJoinPool customThreadPool = new ForkJoinPool(4); // you might need to adjust this value to find optimal performance
  customThreadPool.submit(() -> events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration))).get(); //Due to how ForkJoin pool works tasks will be submitted to the same pool which was used to execute parent task
}

http://www.baeldung.com/java-8-parallel-streams-custom-threadpool


如果他想要直接调用writeMergedFlag(),而不必等待所有writeEvents被调用,则同意。 - Jasper Huzen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接