Java ForkJoinPool创建线程的数量是由什么确定的?

56
据我所理解,ForkJoinPool池会创建一定数量的线程(默认为核心数),并且不会再创建更多的线程(除非应用程序通过使用managedBlock来指示需要更多线程)。
但是,通过使用ForkJoinPool.getPoolSize(),我发现在一个创建了30,000个任务(RecursiveAction)的程序中,执行这些任务的ForkJoinPool平均使用700个线程(每次创建任务时都计算线程数)。这些任务不涉及I/O,只涉及纯计算;任务之间唯一的同步是调用ForkJoinTask.join()和访问AtomicBoolean,即没有线程阻塞操作。
由于我理解join()不会阻塞调用线程,所以池中任何线程都不应该被阻塞,因此(我原以为)不需要创建更多的线程(尽管仍然发生了这种情况)。
那么,为什么ForkJoinPool会创建这么多线程?哪些因素决定了创建线程的数量?
我希望这个问题可以在不发布代码的情况下得到回答,但是如果需要,我当然可以发布完整的程序。
该程序使用深度优先搜索从给定的起点到给定的终点搜索迷宫。解决方案是存在的。主逻辑在SolverTaskcompute()方法中:一个RecursiveAction从某个给定点开始,并继续使用当前点可达的所有相邻点。它不会在每个分支点创建新的SolverTask(这将创建太多任务),而是将除一个以外的所有相邻点压入回溯堆栈以后处理,并仅使用未推送到堆栈的一个邻居点继续执行。一旦它通过这种方式到达死路,就会弹出最近推送到回溯堆栈中的点,并从那里继续搜索(相应地缩短从任务的起点建立的路径)。一旦任务发现其回溯堆栈大于某个特定阈值,它将创建一个新任务;从那时起,该任务将继续从其回溯堆栈中弹出,直到完全清空堆栈。当到达分支点时,它不会再将任何其他点压入其堆栈,而是为每个此类点创建一个新的任务。因此,可以使用堆栈限制阈值调整任务大小。
我引用的数字(“30,000个任务,平均700个线程”)来自搜索5000x5000个单元的迷宫。因此,以下是关键代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
5个回答

21

在stackoverflow上有相关的问题:

ForkJoinPool在invokeAll/join期间停滞

ForkJoinPool似乎浪费了一个线程

我制作了一个简化版本的可运行程序,演示了正在发生的情况(我使用的JVM参数为:-Xms256m -Xmx1024m -Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

显然,在执行连接操作时,当前线程会看到所需任务尚未完成,并为自己接管另一个任务。

这发生在java.util.concurrent.ForkJoinWorkerThread#joinTask中。

然而,这个新任务会生成更多相同的任务,但它们无法在池中找到线程,因为线程在等待连接时被锁定。并且由于它没有办法知道它们要释放的时间有多长(线程可能处于无限循环或永久死锁状态),因此会生成新的线程(如Louis Wasserman所述来补偿已加入的线程):java.util.concurrent.ForkJoinPool#signalWork

因此,为防止出现这种情况,您需要避免递归生成任务。

例如,如果在上面的代码中将初始参数设置为1,则活动线程数量将为2,即使您将childrenCount增加十倍。

还请注意,虽然活动线程的数量增加了,但运行线程的数量小于或等于parallelism


14

根据源代码的注释:

补偿:除非已经有足够多的活动线程,否则tryPreBlock()方法可能会创建或重新激活一个备用线程,以弥补阻塞加入者直到它们解除阻塞为止。

我认为正在发生的是你没有很快完成任何任务,由于没有可用的工作线程来提交新的任务,因此会创建一个新的线程。


9
严格、完全严格和终端严格都与处理有向无环图(DAG)有关。您可以谷歌这些术语以全面了解它们。这就是该框架设计用于处理的类型。查看递归API中的代码,该框架依赖于您的compute()代码来执行其他compute()链接,然后执行一个join()。每个任务都执行单个join(),就像处理DAG一样。
您没有进行DAG处理。您正在分叉许多新任务,并在每个任务上等待(join())。请在源代码中阅读。它非常复杂,但您可能能够理解它。该框架不执行正确的任务管理。当它执行join()时,它将等待任务放在哪里?没有暂停队列,这需要监视线程不断查看已完成的队列。这就是为什么该框架使用“续延线程”的原因。当一个任务执行join()时,该框架假定它正在等待单个较低任务完成。如果存在多个join()方法,则线程无法继续,因此需要存在帮助程序或续延线程。
如上所述,您需要一种散布-聚集型fork-join过程。在那里,您可以分叉尽可能多的任务。

你知道在哪里可以找到分散-聚集类型的fork-join进程吗?你有机会看过Scala吗?它是否更好一些?我正要深入研究这篇文章,也许我能在那里找到答案:A Java™ Fork-Join Conqueror - JimLohse
问题仍然存在于Scala...在另一个问题上,刚刚下载了Java的TymeacSE并将尝试一下。感谢您在我之前的评论中发布的有趣文章。 - JimLohse

5

Holger Peineelusive-code 发布的两个代码片段实际上并没有遵循 javadoc for 1.8 version 中推荐的最佳实践:

在最典型的用法中,fork-join 对会像并行递归函数的调用 (fork) 和返回 (join) 一样。与其他形式的递归调用一样,返回 (join) 应该从最内层开始执行。例如,a.fork(); b.fork(); b.join(); a.join(); 比先加入 a 再加入 b 的代码要高效得多。

在这两种情况下,FJPool 是通过默认构造函数实例化的。这将导致使用默认值 asyncMode=false 来构建池,这是默认值:

@param asyncMode如果为true,则为永远不会加入的分叉任务建立本地先进先出调度模式。在仅处理事件样式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。对于默认值,请使用false。 这样工作队列实际上是lifo: head -> | t4 | t3 | t2 | t1 | ... | <- tail 因此,在代码片段中,他们使用fork()将所有任务推送到堆栈中,然后按相同顺序join(),即从最深的任务(t1)到最高的任务(t4),有效地阻塞直到其他线程窃取(t1),然后是(t2)等等。由于有足够的任务来阻止所有池线程(task_count>>pool.getParallelism()),因此补偿会启动,就像Louis Wasserman所描述的那样。

3
值得注意的是,由elusive-code发布的代码的输出取决于java的版本。 在java 8中运行代码,我看到输出:
...
counter=0.73 activeThreads=45 runningThreads=5 poolSize=49 queuedTasks=105 queuedSubmissions=0 parallelism=2 stealCount=3056
counter=0.75 activeThreads=46 runningThreads=1 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3158
counter=0.77 activeThreads=47 runningThreads=3 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3157
counter=0.74 activeThreads=45 runningThreads=3 poolSize=51 queuedTasks=5 queuedSubmissions=0 parallelism=2 stealCount=3153

但在Java 11中运行相同的代码,输出结果不同:

...
counter=0.75 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=4 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.76 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=3 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.77 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=2 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.78 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=1 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.79 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接