ForkJoinPool - 为什么程序会抛出OutOfMemoryError?

6

我想尝试在Java 8中使用ForkJoinPool,所以我写了一个小程序,用于搜索指定目录下文件名中包含特定关键字的所有文件。

程序:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

当目录没有太多子目录和文件时,该程序可以正常运行,但如果它非常大,那么它会抛出OutOfMemoryError错误。

我理解最大线程数(包括补偿线程)是有限制的,那么为什么会出现这个错误?我的程序有什么问题吗?

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   

2
如果newWorkStealingPool()的作者希望您认为返回的执行程序始终是ForkJoinPool,则会声明返回类型。由于他们没有这样做,因此不应将返回的对象强制转换为ForkJoinPool。毕竟,使用工厂方法然后假定它具有特定的、甚至未记录的行为是没有意义的。如果您想毫无疑问地获取新的ForkJoinPool,只需使用new ForkJoinPool()... - Holger
主要原因是RecursiveTask和ForkJoinTask都不属于Callable或Runnable类型,因此我不能在ExecutorService上调用方法。我这里有什么遗漏吗? - Atul
1
是的。如果要使用RecursiveTaskForkJoinTask,则需要一个ForkJoinPool,因此您应该使用ForkJoinPool,无论是ForkJoinPool.commonPool()还是显式创建一个新的,例如通过new ForkJoinPool()。如果工厂方法声明的返回类型不适合您的任务,请勿将该工厂方法用于您的任务。 - Holger
谢谢,我会记住的,但是在我看来,作者也应该考虑支持ForkJoinTask。 - Atul
1
为了对称性,是的。但最终,与Executors.newForkJoinPool()(如果存在)和new ForkJoinPool()相比,前者的合同恰好是执行后者所做的事情,因此没有优势。 - Holger
嗯,如果从对象创建的角度来看,这是有道理的,但如果我们从用例覆盖的角度来看,肯定存在一些差距。 - Atul
2个回答

8
您不应该对新任务进行过度的分支。基本上,只要有另一个工作线程有可能接管分支的工作并在本地进行评估,您就可以进行分支。然后,一旦您已经分叉了一个任务,请不要立即调用join()。虽然底层框架将启动补偿线程以确保您的任务将继续进行,而不仅仅是所有线程都阻塞等待子任务,但这将创建大量的线程,可能会超出系统的能力范围。
以下是您代码的修订版本:
public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
        List<String> files = task.invoke();
        System.out.println("Total no of files with hello " + files.size());
    }

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
    private static final int TARGET_SURPLUS = 3;
    private File path;
    public FileSearchRecursiveTask(File file) {
        this.path = file;
    }

    @Override
    protected List<String> compute() {
        File directory = path;
        if(directory.isDirectory() && directory.canRead()) {
            System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
            return scan(directory);
        }
        return Collections.emptyList();
    }

    private List<String> scan(File directory)
    {
        File[] fileList = directory.listFiles();
        if(fileList == null || fileList.length == 0) return Collections.emptyList();
        List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
        List<String> filteredFileList = new ArrayList<>();
        for(File file: fileList) {
            System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
            if(file.isDirectory())
            {
                if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                {
                    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                    recursiveTasks.add(task);
                    task.fork();
                }
                else filteredFileList.addAll(scan(file));
            }
            else if(file.getName().contains("hello")) {
                filteredFileList.add(file.getAbsolutePath());
            }
        }

        for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
            FileSearchRecursiveTask task = recursiveTasks.get(ix);
            if(task.tryUnfork()) task.complete(scan(task.path));
        }

        for(FileSearchRecursiveTask task: recursiveTasks) {
            filteredFileList.addAll(task.join());
        }
        return filteredFileList;
    }
}

处理的方法已经被拆分成一个接收目录作为参数的方法,因此我们可以在本地使用它来处理任意目录,而不一定与FileSearchRecursiveTask实例相关。

然后,该方法使用getSurplusQueuedTaskCount()来确定本地排队但尚未被其他工作线程捡起的任务数量。确保有一些帮助工作平衡。但是,如果这个数字超过了阈值,处理将在本地完成,而不会分叉更多的作业。

在本地处理之后,它遍历任务并使用tryUnfork()来识别未被其他工作线程窃取并在本地处理的作业。倒序迭代以从最年轻的作业开始提高发现的机会。

仅在此之后,它使用join()与所有子作业连接,这些子作业现在已经完成或当前由另一个工作线程处理。

请注意,我更改了启动代码以使用默认池。这使用“ CPU核数减1”个工作线程,加上启动线程,即本例中的main线程。


3

只需要进行一项小修改。您需要按照以下方式为newWorkStealingPool指定并行性:

ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(5);

根据文档:

newWorkStealingPool(int parallelism) -> 创建一个线程池,维护足够数量的线程以支持给定的并行级别,并可能使用多个队列以减少争用。并行级别对应于活动参与任务处理或可用于参与任务处理的线程的最大数量。实际线程数可以动态增长和缩小。工作窃取池不能保证提交的任务执行顺序。

根据Java Visual VM截图,该并行性允许程序在指定的内存范围内运行,不会发生内存溢出。 enter image description here

还有一件事(不确定是否会产生影响):

更改fork调用和将任务添加到列表中的顺序。也就是说,更改

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();

to

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
task.fork();
recursiveTasks.add(task);

2
默认情况下,Runtime.getRuntime().availableProcessors()被用作parallelism参数。如果在具有有限CPU数量的虚拟机上运行代码,则通过设置为5,您可能实际上已经增加了并行性。 - Karol Dowbecki
@KarolDowbecki 对,我认为我在这方面找到的唯一提示是ForJoinPool内部的注释:除非已经有足够的活动线程,否则tryCompensate()方法可能会创建或重新激活一个备用线程来弥补被阻塞的joiner直到它们解除阻塞 - Eugene
但是使用并行处理帮助我避免了内存不足的问题。请看屏幕截图。 - KayV
@KayV - 我认为您在 <PATH> 中可能有较少的文件。我尝试使用参数,但出现了相同的错误。 - Atul

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接