并行无限Java流耗尽内存

16

我在尝试理解为什么下面这个带有.parallel()的Java程序会引起OutOfMemoryError,而对应的没有使用.parallel()的程序则不会。

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

我有两个问题:

  1. 这个程序的预期输出是什么?

    如果没有使用 .parallel(),似乎它只会输出 sum(1+2+3+...),这意味着它只是在 flatMap 中卡住了第一个流,这很合理。

    使用 parallel 时,我不知道是否有预期行为,但我猜测它以某种方式交错了前 n 个或多个流,其中 n 是并行工作者的数量。基于分块/缓冲行为,它也可能略有不同。

  2. 是什么导致它耗尽内存?我特别想了解这些流在幕后是如何实现的。

    我猜测是某些东西阻塞了流,因此它永远无法完成并且无法摆脱生成的值,但我不太清楚评估顺序以及缓冲发生的位置。

编辑:如果有关系,我正在使用Java 11。

编辑2:显然,即使对于简单的程序 IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(),也会发生同样的情况,因此它可能与 limit 的惰性有关,而不是与 flatMap 有关。


parallel() 内部使用 ForkJoinPool。我猜测 ForkJoin 框架是从 Java 7 开始引入的。 - aravind
3个回答

11
你说“但我不太清楚计算的顺序和缓冲发生的位置”,这正是并行流所涉及的。评估顺序是未指定的。
你示例的一个关键方面是.limit(100_000_000)。这意味着实现不能只总结任意值,而必须总结前100,000,000个数字。请注意,在参考实现中,.unordered().limit(100_000_000)不会改变结果,这表明没有特殊的无序情况实现,但这是一个实现细节。
现在,当工作线程处理元素时,它们不能只将它们加起来,因为它们必须知道它们被允许消耗哪些元素,这取决于其特定工作负载之前有多少元素。由于该流不知道大小,只有在前缀元素被处理后才能知道这一点,而对于无限流,这永远不会发生。因此,工作线程暂时保持缓冲,直到这些信息变得可用。
原则上,当工作线程知道它处理最左边的¹工作块时,它可以立即对元素求和、计数,并在达到限制时发出结束信号。因此,流可以终止,但这取决于许多因素。
在你的情况下,一个合理的情况是其他工作线程在分配缓冲区方面比最左边的工作更快。在这种情况下,微妙的时间变化可能会使流偶尔返回值。
当我们减慢除处理最左侧块的线程以外的所有工作线程时,我们可以使流终止(至少在大多数运行中)。
System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

我正在遵循Stuart Marks的建议,在谈论遇到顺序而不是处理顺序时使用从左到右的顺序。


非常好的答案!我想知道是否存在这样的风险,即所有线程都开始运行flatMap操作,而没有一个被分配到实际清空缓冲区(求和)?在我的实际用例中,无限流是太大而无法保存在内存中的文件。我想知道如何重写流以降低内存使用率? - Thomas Ahle
1
你是否在使用 Files.lines(…)?在Java 9中对其进行了显着改进。 - Holger
看起来它只是调用了 BufferedReader.lines,这只是一个简单行迭代器的分裂器包装器。因此,我预计它会一直推入我的内存,直到我用完空间,就像 iterate 一样。但当然我还需要测试它。 - Thomas Ahle
1
这是Java 8中的功能。在更新的JRE中,在某些情况下(不是默认文件系统,特殊字符集或大小大于Integer.MAX_FILES),它仍会回退到BufferedReader.lines()。如果其中之一适用,则自定义解决方案可能有所帮助。这值得一个新的问答... - Holger
啊,现在有一个很好的FileChannelLinesSpliterator。但我不确定它是否适用于flatMap(path-> Files.lines(path)),因为flatMap会将内部流转换为顺序流并将所有内容推送出去。(如果我理解正确) - Thomas Ahle
1
外部流是什么?它是文件流吗?它的大小是否可预测? - Holger

5
我的最佳猜测是添加parallel()会改变flatMap()的内部行为,在此之前已经存在懒惰评估的问题
你遇到的OutOfMemoryError错误在[JDK-8202307] 在使用flatMap的无限/非常大的流时调用Stream.iterator().next()时获得Java堆空间不足错误中有报告。如果你查看票据,它与你所遇到的堆栈跟踪基本相同。该票据被关闭并给出了以下原因:不予修复。

iterator()spliterator() 方法是“紧急出口”,在无法使用其他操作时使用。它们有一些限制,因为它们将流实现的推模型转换为拉模型。这种转换在某些情况下需要缓冲,例如当一个元素被(扁平化)映射到两个或多个元素时。如果支持反压力的概念以通信要通过嵌套层次的元素生产来拉取多少元素,那么这将显著地复杂化流实现,可能会牺牲常见情况。


这非常有趣!推/拉转换需要缓冲,可能会使用内存,这是有道理的。然而,在我的情况下,似乎仅使用push应该可以正常工作,并简单地丢弃剩余的元素,因为它们出现了?或者你是说flapmap会导致创建一个迭代器? - Thomas Ahle

3

OOME(内存溢出错误)的原因不是由于流是无限的,而是由于它不是无限的。

也就是说,如果您将.limit(...)注释掉,它将永远不会耗尽内存--但当然,它也永远不会结束。

一旦它被分割,如果它们在每个线程中累积(看起来实际的累加器是Spliterators$ArraySpliterator#array),则流只能跟踪元素数。

似乎您可以在没有flatMap的情况下重现它,只需使用-Xmx128m运行以下内容:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

然而,注释掉limit()后,直到您决定保护您的笔记本电脑为止,它应该可以正常运行。除了实际的实现细节外,以下是我认为正在发生的事情:使用limit时,sum缩减器希望前X个元素求和,因此没有线程会发出部分总和。每个“切片”(线程)都需要累积元素并通过它们传递。如果没有限制,则没有这样的约束,因此每个“切片”将只计算它获得的元素的部分总和(永远),假设它最终会发出结果。

@ThomasAhle parallel() 会在内部使用 ForkJoinPool 来实现并行处理。Spliterator 将被用来将工作分配给每个 ForkJoin 任务,我想我们可以称这里的工作单元为“split”。 - Karol Dowbecki
但是为什么这只发生在限制时呢? - Thomas Ahle
有趣的是,虽然 IntStream.iterate(1,i->i+1).parallel().sum() 运行良好(不停止,但也不崩溃);而版本 IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum() 则会耗尽内存。这表明 limit 做了一些奇怪的事情,不仅仅是对元素进行排序。 - Thomas Ahle
1
@ThomasAhle 在 Integer.sum() 中设置一个断点,该函数被 IntStream.sum 归约器使用。您会发现无限制版本一直调用该函数,而有限制版本在 OOM 之前从未调用它。 - Costi Ciudatu
@CostiCiudatu 你认为这是为什么呢? - Thomas Ahle
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接