使flatMap的结果流并行化

6

考虑下面这段简单的代码:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

很长一段时间,我认为Java会在flatMap之后对元素进行并行执行。但是上面的代码打印了所有的“Thread: main”,证明了我的想法是错误的。
一个简单的方法是先收集再流式处理,这样就可以在flatMap之后并行执行了。
Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .collect(Collectors.toList())
  .parallelStream()
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

我在想是否有更好的方法,并对flatMap的设计选择是否只在调用前并行化流进行了思考。
========= 关于问题的更多澄清 ========
从一些答案中可以看出,我的问题没有完全表达清楚。正如@Andreas所说,如果我从3个元素的流开始,可能会有3个线程运行。
但是我的问题确实是:Java Stream使用一个公共的ForkJoinPool,默认大小等于核心数减1,根据这篇帖子。现在假设我有64个核心,那么我期望以上代码在flatMap之后会看到许多不同的线程,但实际上只看到了一个(或者在Andreas的情况下是三个)。顺便说一句,我确实使用了isParallel来观察流是并行的。
老实说,我提出这个问题并不是出于纯学术兴趣。我在一个项目中遇到了这个问题,该项目呈现了一长串用于转换数据集的流操作。该链以单个文件开始,并通过flatMap扩展为大量元素。但显然,在我的实验中,它没有充分利用我的计算机(它有64个核心),而只使用了一个核心(从观察cpu使用率)。

flatMap 这个东西到底有什么用?它只会让问题更加混淆。 - chrylis -cautiouslyoptimistic-
我还要注意的是,你可以随时调用isParallel来查看一个你认为是并行还是顺序流的流实际上是什么。 - chrylis -cautiouslyoptimistic-
3
这是OpenJDK实现的已知限制。如果他们要改变这一点,我认为他们必须先解决其他一些问题。例如,单元素流被隐式视为无序,目前没有影响,但在为“子流”启用并行处理时,将整个流视为无序可能会导致意外情况。如果你想执行并行递归文件处理,可以参考此答案:链接 - Holger
@Holger 很有帮助的评论。我在包描述中看到的是,“流的方向可以通过BaseStream.sequential()和BaseStream.parallel()操作进行修改。” 就这样。解释结束。是否有某个链接可以了解限制? - John
2
@JohnMeyer 内部细节已在这个旧的问答中讨论过。虽然缺少短路问题已经得到解决,但仅按顺序迭代子流的基本行为并没有改变。 - Holger
显示剩余2条评论
3个回答

1
我在想,关于flatMap的设计选择,它只是在调用之前并行化流,而不是在调用之后。你错了。在flatMap之前和之后的所有步骤都是并行运行的,但它仅在线程之间分割原始流。然后,一个这样的线程处理flatMap操作,它的流不会被分割。由于您的原始流只有1个元素,因此它无法被分割,因此parallel没有效果。尝试更改为Stream.of(1, 2, 3),您将看到flatMap之后的forEach实际上在3个不同的线程中运行。

1
这实际上并没有指定的行为。 - chrylis -cautiouslyoptimistic-
@chrylis-cautiouslyoptimistic- 我从未说过它是并行执行的,我只是纠正了 OP 不正确的说法,即 flatMap 后面的步骤没有并行执行。我描述了观察到的行为,就像问题所描述的那样。我对“指定”的行为没有任何声明,并且它在未来可能会发生变化。 - Andreas

0

对于像我这样迫切需要并行处理flatMap并且需要一些实际解决方案,而不仅仅是历史和理论的人。以及那些在并行化之前不考虑收集所有项的人。

我想到的最简单的解决方案是手动平铺,基本上是通过将其替换为map + reduce(Stream::concat)来完成的。

我已经在另一个线程中回答了同样的问题,请参见https://dev59.com/TVcO5IYBdhLWcg3w7lvq#66386078以获取详细信息。


0

forEach的文档指定:

对于任何给定的元素,操作可以在库选择的任何时间和任何线程中执行。

特别地,“在调用线程上执行所有操作”似乎是一个很好的广泛安全的实现。

请注意,您尝试并行化流不需要任何特定的并行性,但使用以下内容更有可能看到效果:

IntStream.range(0, 1024).boxed()
  .parallel()
  .map(i -> "Thread: " + Thread.currentThread().getName())
  .forEach(System.out::println);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接