Java Stream API:为什么要区分顺序执行和并行执行模式?

21
Stream javadoc中得知:
流管道可以顺序或并行执行。这种执行模式是流的属性。创建流时可以选择初始的顺序或并行执行。
我的假设:
1.顺序/并行流没有功能上的区别。输出永远不会受到执行模式的影响。
2.并行流总是更可取的,如果有足够的核心数和问题规模来证明其开销,则由于性能增益而优先使用。
3.我们希望编写一次代码,无论硬件如何都可以运行(毕竟这是Java)。
假设这些假设是有效的(对元假设没有什么问题),那么在api中公开执行模式的价值在哪里?
似乎你应该只需声明一个流,顺序/并行执行的选择应该由位于下层的库代码或JVM自动以可用核心数、问题规模等为函数处理。
当然,假设并行流也适用于单核机器,可能始终使用并行流可以实现这一点。但这真的很丑陋—为什么在我的代码中显式引用并行流,而这是默认选项呢?
即使存在一种场景,您想要故意硬编码使用顺序流-为什么没有子接口SequentialStream来实现此目的,而不是在流中污染一个执行模式开关?

2
如果你的代码不是线程安全的呢? - SLaks
1
嗯,说得好 - 但这不是例外而不是一般情况吗?如果你正在进行函数式编程,那么你的代码应该从设计上就是线程安全的。如果你不是这样做的,那么为什么要首先使用流和lambda呢? - davnicwil
4个回答

29
看起来你只需要声明一个 Stream,然后顺序/并行执行的选择应该在底层由库代码或JVM自动处理,这取决于运行时可用的核心数、问题的大小等因素。
现实情况是:a)Stream只是一个库,没有特殊的JVM魔法;b)你不能设计一个聪明到可以自动判断正确决策的库。没有合理的方法来估算某个函数的成本,除非你运行它 - 即使你能查看其实现细节,它也是不可能的,同时引入基准测试的影响到每个Stream操作,以确定并行化是否值得并行化开销的代价。那是不切实际的,尤其是考虑到您事先无法知道并行化开销的严重程度。
在适当的核心数量和问题大小下,平行流始终优于顺序流,由于性能增益,但在实践中并非总是如此。有些任务太小了,不值得并行化,并行化总是会有一些开销。(坦白说,大多数程序员往往高估并行性的效用,在真正损害性能时无处不用),基本上,这是一个足够难的问题,你基本上必须把它推给程序员。

我同意。我认为大多数情况下,如果任务很小,就没有必要并行化。这确实取决于硬件、操作类型等因素。 - Ushox
1
好的回答。请参考像这个问题(特别是链接的lambda-dev讨论)这样的事情:https://dev59.com/IGAh5IYBdhLWcg3wE_5P,了解尝试过于聪明地处理核心库中如此重要和基础的部分所面临的困难。 - Shorn
这是一个有趣的观点,认为这是一个足够困难的问题,不应该在库代码中尝试解决。但是,话虽如此,我认为通过启发式方法估计其是否值得开销并不可怕。您知道核心数量和流的大小。无论如何,我想这是一个已经做出的决定,完全回避了这个问题,并像你所说的那样将其推给程序员 - 毕竟Java是一种语言,而不是框架。 - davnicwil

5
这个问题中,有一个有趣的案例表明并行流有时可能比顺序流慢几个数量级。 在那个特定的例子中, 并行版本运行了十分钟,而顺序版本只需几秒钟。

3

顺序流和并行流之间没有功能性差异。输出永远不会受执行模式的影响。

顺序流和并行流执行方式有所区别。在下面的代码中,TEST_2 的结果表明并行线程执行比顺序方式快得多。

如果合适的核心数和问题规模能够证明开销的价值,则并行流始终是首选,因为可以获得性能提升。

事实并非如此。如果任务不值得(简单任务)在并行线程中执行,那么我们只是给我们的代码添加了额外的开销。TEST_1 的结果显示了这一点。此外,请注意,如果所有工作线程都在一个并行执行任务上忙碌,那么您代码中其他地方的并行流操作将等待该任务。

我们希望编写代码一次,在任何地方运行,而无需关心硬件(毕竟这是 Java)。

由于只有编程人员知道是否应该在 CPU 的情况下并行/顺序执行该任务。因此,Java API 向开发人员公开这两个选项。

import java.util.ArrayList;
import java.util.List;

/*
 * Performance test over internal(parallel/sequential) and external iterations.
 * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
 * 
 * 
 * Parallel computing involves dividing a problem into subproblems, 
 * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread),
 *  and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, 
 *  which enables you to more easily implement parallel computing in your applications. However, with this framework, 
 *  you must specify how the problems are subdivided (partitioned). 
 *  With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you.
 * 
 * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1,
 *  so that the pool size is limited to one and no gain from parallelization
 *  
 *  @see ForkJoinPool
 *  https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
 *  
 *  ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and 
 *  will never create more threads (unless the application indicates a need for those by using managedBlock).
 *   *  https://dev59.com/6uo6XIcBkEYKwwoYQiK7
 *  
 */
public class IterationThroughStream {
    private static boolean found = false;
    private static List<Integer> smallListOfNumbers = null;
    public static void main(String[] args) throws InterruptedException {


        // TEST_1
        List<String> bigListOfStrings = new ArrayList<String>();
        for(Long i = 1l; i <= 1000000l; i++) {
            bigListOfStrings.add("Counter no: "+ i);
        }

        System.out.println("Test Start");
        System.out.println("-----------");
        long startExternalIteration = System.currentTimeMillis();
        externalIteration(bigListOfStrings);
        long endExternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found);

        long startInternalIteration = System.currentTimeMillis();
        internalIteration(bigListOfStrings);
        long endInternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found);





        // TEST_2
        smallListOfNumbers = new ArrayList<Integer>();
        for(int i = 1; i <= 10; i++) {
            smallListOfNumbers.add(i);
        }

        long startExternalIteration1 = System.currentTimeMillis();
        externalIterationOnSleep(smallListOfNumbers);
        long endExternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1));

        long startInternalIteration1 = System.currentTimeMillis();
        internalIterationOnSleep(smallListOfNumbers);
        long endInternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1));




        // TEST_3
        Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread);

        t1.start();
        t2.start();
        t3.start();
        t4.start();

        Thread.sleep(30000);
    }


    private static boolean externalIteration(List<String> bigListOfStrings) {
        found = false;
        for(String s : bigListOfStrings) {
            if(s.equals("Counter no: 1000000")) {
                found = true;
            }
        }
        return found;
    }

    private static boolean internalIteration(List<String> bigListOfStrings) {
        found = false;
        bigListOfStrings.parallelStream().forEach(
                (String s) -> { 
                    if(s.equals("Counter no: 1000000")){  //Have a breakpoint to look how many threads are spawned.
                        found = true;
                    }

                }
            );
        return found;       
    }


    private static boolean externalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        for(Integer s : smallListOfNumbers) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return found;
    }

    private static boolean internalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access).
                (Integer s) -> {
                    try {
                        Thread.sleep(100); //Have a breakpoint to look how many threads are spawned.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
        return found;       
    }

    public static void internalIterationOnThread() {
        smallListOfNumbers.parallelStream().forEach(
                (Integer s) -> {
                    try {
                        /*
                         * DANGEROUS
                         * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (e.g. t1), 
                         * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. 
                         */
                        Thread.sleep(100); //Have a breakpoint here.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
    }
}

0
似乎你只需声明一个流,而顺序/并行执行的选择应由下面一层自动处理,无论是库代码还是JVM本身,作为运行时可用核心、问题大小等的函数。
补充已有答案:
这是一个非常大胆的假设。想象一下,为训练某种AI而模拟棋盘游戏,很容易将不同的游戏过程的执行并行化 - 只需创建一个新实例,并让它在自己的线程上运行。由于它不与另一个游戏过程共享任何状态,因此您甚至不必考虑游戏逻辑中的多线程问题。另一方面,如果并行化游戏逻辑本身,则会出现各种多线程问题,并且很可能为复杂性甚至性能付出高昂的代价。
控制流的行为使您(适当限制)具有灵活性,这本身就是良好的库设计的关键特征。

1
我理解你的观点,但我认为流应该是线程安全的是一个公平的假设。如果你的流逻辑*依赖于单线程,因为它依赖于共享内存等,则我不明白为什么你会使用流。在我看来,流是一种函数式编程模式,即数据处理而不关心执行的低级细节 - 顺序、副作用、并行或串行执行 - 这些都是你不必关心的细节。我确实理解你在API中灵活性和控制方面的观点,但我不同意这是一个好的设计选择 :-) - davnicwil

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接