来自 HashSet 的并行流不会并行运行

26

我有一组元素,想要并行处理。当我使用 List 时,并行处理可以正常工作。但是,当我使用 Set 时,它无法并行运行。

我写了一个代码示例来展示这个问题:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}

这是我在 Windows 7 上得到的输出结果。

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

我们可以看到,来自Set的第一个元素必须在处理第二个元素之前完成。对于List,第二个元素在第一个元素完成之前就开始了。

你能告诉我是什么原因导致了这个问题,并且如何使用Set集合来避免它吗?


尝试使用超过两个元素进行测试,例如10个元素或其他数量。仅使用2个元素的结果太模糊了。 - nafas
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Nemo
无论如何,这是10个元素集合的输出(使用10个执行器池)打印: 开始:8 开始:0 开始:4 开始:6 开始:2 结束:2 结束:6 结束:4 结束:0 开始:1 结束:8 开始:9 开始:5 开始:7 开始:3 结束:3 结束:5 结束:9 结束:7 结束:1列表打印: 开始:7 开始:3 开始:0 开始:6 开始:9 开始:8 开始:5 开始:4 开始:2 开始:1 结束:0 结束:6 结束:7 结束:9 结束:2 结束:3 结束:8 结束:5 结束:1 结束:4 并非所有集合元素都可以并行运行。 - Nemo
1个回答

37
我可以重现您看到的行为,其中并行性与您指定的fork-join池并行性不匹配。将fork-join池并行性设置为10,并将集合中元素的数量增加到50后,我看到基于列表的流的并行度仅上升到6,而基于集合的流的并行度从未超过2。
但是,请注意,将任务提交给fork-join池以在该池中运行并行流的技术是一种实现“技巧”,并且不能保证有效。实际上,用于执行并行流的线程或线程池是未指定的。默认情况下,使用公共fork-join池,但在不同的环境中,可能会使用不同的线程池。(考虑应用服务器中的容器。)
java.util.stream.AbstractTask类中,LEAF_TARGET字段确定拆分的数量,从而确定可以实现的并行性。该字段的值基于ForkJoinPool.getCommonPoolParallelism(),当然使用的是公共池的并行性,而不是运行任务的任何池。
可以说这是一个错误(参见OpenJDK问题JDK-8190974),然而,整个区域都未指定。但是,该系统的这一领域肯定需要发展,例如在拆分策略、可用并行性、处理阻塞任务等方面存在其他问题。JDK的未来版本可能会解决其中一些问题。
同时,通过使用系统属性,可以控制公共fork-join池的并行性。如果将此行添加到您的程序中,
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

如果您在公共池中运行流(或者如果您将它们提交到具有足够高的并行级别的自己的池中),您将观察到更多的任务会并行运行。您也可以使用-D选项在命令行上设置此属性。同样,这不是保证的行为,未来可能会发生变化。但是,这种技术可能会在可预见的未来用于JDK 8实现。更新2019-06-12:问题JDK-8190974已在JDK 10中修复,并且该修复程序已经被反向移植到即将发布的JDK 8u222版本中。

1
@SotiriosDelimanolis,请看一下Dimitar在这里的评论。我看到你们也在另一个问题上讨论这个问题。 - Stuart Marks
3
@DimitarDimitrov,我认为这比你想象的要简单。 "可以说这是一个错误"这个声明是关于流内部的分割行为的。它总是基于公共池的并行性进行分割。但是,如果该流针对另一个池(使用未记录的黑客方式),则分割仍由公共池的并行性控制,而不是目标池的并行性控制。 - Stuart Marks
1
@StuartMarks 没错,你对于 AbstractTask 的行为描述是正确的,而我关于 ForkJoinPool 压力过大导致问题的回答是错误的。感谢大家的耐心等待,我会相应地更新我的回答。至于 @SotiriosDelimanolis 在这里的评论,无论这是否可以被归类为一个 bug 以及非默认线程池是否应该被支持,修复这个问题可能需要更多实质性的改变,因为目前这个流不知道它将在哪个并行度较高的线程池中运行。 - Dimitar Dimitrov
1
@pppavan 可能是因为 ArrayList 的元素在数组中密集地打包,所以每个分割都是完整的。HashSet 的元素相对稀疏地分布在其表中的桶中。 - Stuart Marks
1
@d-coder 可能是因为 FJ 池已经被创建了。一旦它被创建,你就不能通过改变属性来改变它的大小。尝试在 main() 的顶部设置属性。我不认为在 main() 之前有任何东西会创建 FJ 池(但这可能会因 JDK 版本而异)。如果失败了,请尝试在命令行上设置属性。 - Stuart Marks
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接