分支/合并框架相对于线程池有什么优势?

153

使用新的fork/join框架相比于一开始就将大任务分成N个子任务,将它们发送到缓存线程池(来自Executors)并等待每个任务完成,有哪些好处呢?我无法看出使用fork/join抽象化简了问题或使解决方案比我们多年来拥有的更有效。

例如,教程示例中的并行模糊算法可以这样实现:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

在开始时进行拆分并将任务发送到线程池:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

任务进入线程池队列,等待工作线程可用时执行。只要分割足够细(避免特别等待最后一个任务),并且线程池有足够的线程(至少N个处理器),所有处理器都会以全速工作直到完成整个计算。
我是否遗漏了什么?使用fork/join框架的附加价值是什么?
11个回答

150

我认为基本的误解在于 Fork/Join 的示例 没有 展示工作的窃取,而只是一种标准的分治算法。

工作窃取看起来像这样:工人 B 完成了他的工作。他很好心,于是他四处张望,发现工人 A 仍然在非常努力地工作。他走过去问:“嘿小伙子,我可以帮你一下吗?”A 回答说:“太好了,我有一个由1000个单元组成的任务。到目前为止,我已经完成了345个,还剩下655个。你能否请处理673到1000之间的内容,我来处理346到672之间的内容。”B 说:“好的,让我们开始吧,这样我们就能早点去酒吧了。”

你看 - 工人们必须在真正开始工作之前进行交流。这是示例中缺失的部分。

另一方面,这些示例只展示了类似于“使用分包商”的内容:

工人 A:“我的天啊,我有1000个工作单位。对我来说太多了。我自己做500个并将500个分包给其他人。”这样做会将大任务拆分成每个10个单位的小数据包。这些小数据包将由可用的工人执行。但如果其中一个数据包比其他数据包花费的时间长得多 -- 那就倒霉了,拆分阶段就结束了。

Fork/Join 和预先拆分任务之间唯一剩下的区别是:预先拆分时,工作队列从一开始就是满的。例如:1000个单位,阈值为10,那么队列就有100个条目。这些数据包被分配给线程池成员。

Fork/Join 更为复杂,尝试保持队列中的数据包数量较少:

  • 步骤1:将包含(1...1000)的一个数据包放入队列中
  • 步骤2:一个工人弹出数据包(1...1000)并替换为两个数据包:(1...500)和(501...1000)。
  • 步骤3:一个工人弹出数据包(500...1000)并推送(500...750)和(751...1000)。
  • 第n步:栈中包含这些数据包:(1..500),(500...750),(750...875)... (991..1000)
  • 第n+1步:弹出并执行数据包(991..1000)
  • 第n+2步:弹出并执行数据包(981..990)
  • 第n+3步:弹出数据包(961..980),将其分割成(961...970)和(971..980)。
  • 可以看到,在Fork/Join中队列较小(例如为6),"split"和"work"阶段是交替进行的。

    当多个工作程序同时弹出和推入时,它们之间的交互当然不是那么清晰。


    我认为这确实是答案。不知道是否有实际的Fork/Join示例可以展示其工作窃取能力?使用基本示例,工作量的数量可以从单位的大小(例如数组长度)完美地预测,因此事先拆分很容易。在问题中,如果无法从单位的大小准确预测工作量,则窃取肯定会产生巨大差异。 - Joonas Pulakka
    如果你的答案是正确的,它并没有解释如何做到的。Oracle给出的示例并没有导致工作窃取。您能否展示一些Java代码,使fork和join按照您所描述的方式窃取工作?谢谢。 - Marc
    @Marc:很抱歉,我没有可用的示例。 - A.H.
    6
    在我看来,Oracle的例子存在问题并不是因为它没有展示工作窃取(正如A.H.所描述的那样,它确实展示了),而是很容易编写一个针对简单线程池的算法,其表现同样优秀(就像Joonas所做的那样)。F-J算法在任务无法被预先分解为足够独立的任务时非常有用,但可以递归地将其分解成互相独立的任务。请参见我的答案,其中有一个例子。 - ashirley
    2
    一些工作窃取可能会派上用场的例子:http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html - volley

    28
    如果你有n个繁忙的线程都以100%的独立方式工作,那将比在Fork-Join(FJ)池中拥有n个线程要好。但事实往往并非如此。
    可能无法将问题精确地分成n个相等的部分。即使您这样做了,线程调度也远远没有公平。你最终会等待最慢的线程。如果您有多个任务,则每个任务可以以少于n路并行运行(通常更有效),但在其他任务完成后可以增加到n路。
    那么为什么我们不将问题切割成FJ大小的块,并由一个线程池处理呢?典型的FJ使用将问题划分为小块。以随机顺序执行这些操作需要在硬件级别上进行大量协调。开销会是致命的。在FJ中,任务被放置到队列中,线程按后进先出(LIFO / stack)顺序读取该队列,并且首先进行核心工作(一般来说是工作窃取/ FIFO /“队列”)。结果是,即使长数组处理被分成微小的块,也可以基本上按顺序完成。(还有一种情况是,在一次大爆炸中将问题分解为小而平均大小的块可能并不容易。例如,处理某种形式的层次结构时可能无法平衡。)
    结论:FJ允许在不均匀情况下更有效地使用硬件线程,如果您有超过一个线程,这种情况将始终存在。

    但是为什么FJ最终不会等待最慢的线程呢?子任务的数量是可以预先确定的,当然其中一些始终是最后完成的。在我的示例中调整maxSize参数将产生几乎类似于FJ示例中的“二进制分裂”的子任务划分(在compute()方法内完成,该方法要么计算某些内容,要么将子任务发送到invokeAll())。 - Joonas Pulakka
    因为它们要小得多 - 我会在我的回答中添加。 - Tom Hawtin - tackline
    如果子任务的数量比实际可以并行处理的数量大几个数量级(这是有意义的,以避免等待最后一个任务),那么我可以看到协调问题。如果划分得太细,则FJ示例可能会误导:它使用100000的阈值,对于1000x1000的图像,将产生16个实际子任务,每个子任务处理62500个元素。对于10000x10000的图像,将有1024个子任务,这已经是相当多了。 - Joonas Pulakka

    26

    线程池和Fork/Join的最终目标是相似的:都希望充分利用可用的CPU资源,以获得最大吞吐量。最大吞吐量意味着应该在长时间内完成尽可能多的任务。要做到这一点,至少需要运行与可用CPU数量相同的线程,因为运行较少的线程会导致核心未被使用。最多应有与可用CPU数量相同的线程在运行,因为运行更多的线程将为调度程序创建额外的负载,调度程序将CPU分配给不同的线程,这会导致一些CPU时间用于调度程序而不是计算任务。

    因此,我们确定了要实现最大吞吐量,需要具有与CPU数量完全相同的线程数。在Oracle的示例中,您可以使用一个具有线程数等于可用CPU数量的固定大小线程池或使用线程池。这不会有任何区别,你是对的!

    那么什么情况下会遇到线程池的麻烦呢?就是当线程被阻塞时,因为您的线程正在等待另一个任务完成。假设以下示例:

    class AbcAlgorithm implements Runnable {
        public void run() {
            Future<StepAResult> aFuture = threadPool.submit(new ATask());
            StepBResult bResult = stepB();
            StepAResult aResult = aFuture.get();
            stepC(aResult, bResult);
        }
    }
    
    这里展示的算法由三个步骤 A、B 和 C 组成。A 和 B 可以相互独立执行,但是步骤 C 需要依赖于步骤 A 和 B 的结果。这个算法提交任务 A 到线程池中并直接执行任务 B。之后,该线程将等待任务 A 完成,并继续执行步骤 C。如果 A 和 B 同时完成,则一切正常。但是如果 A 比 B 花费更长的时间呢?这可能是因为任务 A 的特性所致,也可能是因为一开始没有为任务 A 准备好线程,需要等待。 (如果只有一颗 CPU 可用,因此您的线程池只有一个线程,这甚至会导致死锁,但现在这不是重点)。关键是刚刚执行任务 B 的线程会阻塞整个线程。由于我们有与 CPU 数相同的线程数,并且其中一个线程被阻塞了,这意味着 一个 CPU 空闲
    Fork/Join 可以解决这个问题:在 Fork/Join 框架中,可以按照以下方式编写相同的算法:
    class AbcAlgorithm implements Runnable {
        public void run() {
            ATask aTask = new ATask());
            aTask.fork();
            StepBResult bResult = stepB();
            StepAResult aResult = aTask.join();
            stepC(aResult, bResult);
        }
    }
    

    看起来一样,是吗?然而,关键在于aTask.join不会阻塞。这里就涉及到工作窃取的问题:线程将寻找过去被分叉的其他任务并继续执行它们。首先,它会检查自己分叉的任务是否已经开始处理。如果A还没有被另一个线程启动,那么它将接着做A,否则它将检查其他线程的队列并窃取他们的工作。一旦另一个线程的任务完成了,它将检查A是否已经完成。如果是,则上述算法可以调用stepC 。否则,它将寻找另一个要窃取的任务。因此,fork/join池甚至在面对阻塞操作时也能实现100%的CPU利用率

    但是有一个陷阱:工作窃取仅适用于ForkJoinTaskjoin 调用。无法针对等待另一个线程或等待I/O操作等外部阻塞操作完成的情况进行操作。那么等待I/O完成是常见的任务怎么办?在这种情况下,如果我们可以向Fork/Join池添加一个额外的线程,该线程将在阻塞操作完成后立即停止,这将是第二好的方法。如果我们使用ManagedBlocker ,那么ForkJoinPool实际上可以做到这一点。

    Fibonacci

    《JavaDoc for RecursiveTask》中有一个使用Fork/Join计算斐波那契数列的例子。对于经典的递归解决方案,请参见:

    public static int fib(int n) {
        if (n <= 1) {
            return n;
        }
        return fib(n - 1) + fib(n - 2);
    }
    

    根据JavaDocs的解释,这是一个计算斐波那契数列的相当低效的方法,因为该算法的复杂度为O(2^n),虽然有更简单的方法可用。但是该算法非常简单易懂,因此我们坚持使用它。假设我们想要使用Fork/Join来加快速度。一个天真的实现看起来像这样:

    class Fibonacci extends RecursiveTask<Long> {
        private final long n;
    
        Fibonacci(long n) {
            this.n = n;
        }
    
        public Long compute() {
            if (n <= 1) {
                return n;
            }
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();
            Fibonacci f2 = new Fibonacci(n - 2);
            return f2.compute() + f1.join();
       }
    }
    

    此任务分为的步骤太短,因此性能表现非常差,但您可以看到框架通常运作得非常好:两个加数可以独立计算,但是我们需要它们来构建最终结果。因此,一半在另一个线程中完成。尝试使用线程池执行相同操作时,请注意避免死锁(可能,但不如简单)。

    仅为完整起见:如果您确实想使用此递归方法计算斐波那契数列,这里有一个经过优化的版本:

    class FibonacciBigSubtasks extends RecursiveTask<Long> {
        private final long n;
    
        FibonacciBigSubtasks(long n) {
            this.n = n;
        }
    
        public Long compute() {
            return fib(n);
        }
    
        private long fib(long n) {
            if (n <= 1) {
                return 1;
            }
            if (n > 10 && getSurplusQueuedTaskCount() < 2) {
                final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
                final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
                f1.fork();
                return f2.compute() + f1.join();
            } else {
                return fib(n - 1) + fib(n - 2);
            }
        }
    }
    

    这种方法能够保持子任务更小,因为只有当n > 10 && getSurplusQueuedTaskCount() < 2条件成立时才进行拆分,这意味着有超过100个方法调用要执行(n > 10),并且没有很多任务在等待执行(getSurplusQueuedTaskCount() < 2)。

    在我的电脑上(4核(8核超线程),Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz),使用经典方法计算fib(50)需要64秒,而使用Fork/Join方法只需要18秒,这是相当明显的性能提升,虽然不如理论上可能的那么多。

    总结

    • 在您的示例中,Fork/Join与经典线程池相比没有优势。
    • 当涉及阻塞时,Fork/Join可以大大提高性能。
    • Fork/Join规避了一些死锁问题。

    21

    与线程池不同,分支/合并使用工作窃取实现。 来源:Fork/Join

    与任何ExecutorService一样,分支/合并框架将任务分配给线程池中的工作线程。 分支/合并框架不同之处在于它使用了一种工作窃取算法。 运行完任务的工作线程可以从其他仍然繁忙的线程中窃取任务。

    假设你有两个线程和4个任务a,b,c,d,它们分别需要1、1、5和6秒钟才能完成。 最初,线程1被分配执行任务a和b,线程2分配执行任务c和d。 在线程池中,这将需要11秒。但是,分支/合并中,线程1完成了它的任务,并且可以从线程2中窃取任务,所以任务d最终由线程1执行。 线程1执行了a,b和d,线程2只执行c。 总时间:8秒,而不是11秒。

    编辑:正如Joonas指出的那样,任务并不一定预先分配给某个线程。 分支/合并的想法是,线程可以选择将任务拆分为多个子部分。 因此,对于上述的重述:

    我们有两个任务(ab)和(cd),它们分别需要2秒和11秒钟才能完成。 线程1开始执行ab,并将其拆分为两个子任务a和b。 同样,线程2也将任务cd拆分为两个子任务c和d。 当线程1完成a和b时,它可以从线程2中窃取d。


    5
    线程池通常是ThreadPoolExecutor实例。在这样的情况下,任务进入一个队列(实际上是BlockingQueue),工作线程会在完成前一个任务后立即获取下一个任务。据我所知,任务不会预先分配给特定的线程。每个线程同时最多只有1个任务。 - Joonas Pulakka
    4
    据我所知,一个ThreadPoolExecutor只有一个队列,该队列控制着多个线程。这意味着在将任务或Runnables(而不是线程)分配给执行器时,任务也没有预先分配给特定的线程。这正是FJ所做的方式。到目前为止,使用FJ没有任何好处。 - A.H.
    1
    @A.H. 是的,但fork/join允许您分割当前任务。执行任务的线程可以将其分成两个不同的任务。因此,使用ThreadPoolExecutor,您有一个固定的任务列表。使用fork/join,执行任务的线程可以将自己的任务分成两个,然后在其他线程完成工作时可以拾取它们。或者你可以先完成。 - Matthew Farwell
    2
    @Matthew Farwell:我可能不完全理解FJ,但如果一个子任务已经决定执行其computeDirectly()方法,那么就没有办法再去窃取任何东西了。至少在这个例子中,整个分割是预先完成的。 - Joonas Pulakka
    1
    @JoonasPulakka:我写了一个回答,试图解决这次讨论中的问题。 - A.H.
    显示剩余2条评论

    15
    在这个例子中,Fork/Join 没有添加任何价值,因为不需要分叉并且工作负载已经平均分配在工作线程中。Fork/Join 只会增加开销。
    这里有一篇与此主题相关的好文。引用:
    “总的来说,我们可以说,在工作负载均匀分配给工作线程的情况下,应该优先选择 ThreadPoolExecutor。要能够保证这一点,您确实需要准确地知道输入数据的结构。相比之下,ForkJoinPool 提供了良好的性能,而不管输入数据是什么,因此是一个更加强大的解决方案。”

    14

    以上所有人都是正确的,工作偷取带来的好处是明显的,但要扩展一下为什么。

    主要好处在于工作线程之间的高效协调。工作必须被分割和重新组装,这需要协调。正如A.H在上面回答的那样,每个线程都有自己的工作列表。这个列表的一个重要属性是它是排序的(大任务在顶部,小任务在底部)。每个线程执行其列表底部的任务并从其他线程的列表顶部窃取任务。

    结果是:

    • 任务列表的头和尾可以独立同步,减少了对列表的争用。
    • 工作的显著子树由同一线程分裂和重新组合,因此对于这些子树不需要进行跨线程协调。
    • 当线程窃取工作时,它会取一个大块然后将其细分到自己的列表中
    • 工作窃取使线程几乎完全利用,直到进程结束。

    大多数使用线程池的其他分治方案需要更多的跨线程通信和协调。


    8
    另一个重要的区别似乎是,使用F-J可以进行多个复杂的“Join”阶段。考虑来自http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html的合并排序,需要太多的编排来预先拆分这项工作。例如:您需要完成以下任务:
    • 对前一季度进行排序
    • 对后一季度进行排序
    • 合并前两个季度
    • 对第三个季度进行排序
    • 对第四个季度进行排序
    • 合并后两个季度
    • 合并两半
    怎么指定必须在涉及它们之前执行排序和合并等操作?
    我一直在研究如何为每个项目列表做某件事情。我想我只需预先拆分列表并使用标准的线程池即可。当工作无法预先分成足够独立的任务但可以递归地拆分为彼此独立的任务时(例如,排序是独立的,但将两个排序好的半部分合并成一个排序好的整体则不是),F-J似乎最有用。

    6

    当您需要进行昂贵的合并操作时,F/J也具有独特的优势。因为它分成了树形结构,所以您只需要进行log2(n)次合并,而线性线程分割则需要n次合并。(这确实做出了理论假设,即您拥有与线程一样多的处理器,但仍然是一个优势)对于一项作业任务,我们必须通过对每个索引处的值求和来合并数千个二维数组(所有数组具有相同的维度)。使用fork join和P个处理器,时间接近于log2(n),当P趋近于无穷大时。

    1 2 3 .. 7 3 1 .... 8 5 4
    4 5 6 + 2 4 3 => 6 9 9
    7 8 9 .. 1 1 0 .... 8 9 9


    3
    您会惊讶于ForkJoin在像爬虫这样的应用程序中的性能表现。以下是您可以从中学习的最佳教程:链接

    Fork/Join的逻辑非常简单:(1)将每个大任务分解成更小的任务(fork);(2)在单独的线程中处理每个任务(如果必要,将其进一步分解为更小的任务);(3)合并结果(join the results)。


    3
    如果问题需要等待其他线程完成(例如排序数组或数组求和),则应使用fork join,因为Executor(Executors.newFixedThreadPool(2))由于有限数量的线程而会出现堵塞。
    在这种情况下,forkjoin池将创建更多线程以弥补受阻线程,以保持相同的并行性。
    实现分治算法的执行器的问题与创建子任务无关,因为Callable可以自由地向其执行器提交新的子任务,并以同步或异步方式等待其结果。问题在于并行性:当Callable等待另一个Callable的结果时,它处于等待状态,从而浪费了处理排队执行的另一个Callable的机会。
    Java SE 7中添加到java.util.concurrent包中的fork/join框架通过Doug Lea的努力填补了这一空白。

    来源: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

    该池尝试通过动态添加、挂起或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务因等待加入其他任务而停滞不前。然而,在面对阻塞IO或其他未受管理的同步时,并不能保证进行此类调整。

    public int getPoolSize() 返回已启动但尚未终止的工作线程数。当创建线程以维护并行性时,由此方法返回的结果可能与getParallelism()不同,因为其他线程被协作地阻塞。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接