考虑下面这段简单的代码:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
很长一段时间,我认为Java会在
flatMap
之后对元素进行并行执行。但是上面的代码打印了所有的“Thread: main”,证明了我的想法是错误的。一个简单的方法是先收集再流式处理,这样就可以在
flatMap
之后并行执行了。Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.collect(Collectors.toList())
.parallelStream()
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
我在想是否有更好的方法,并对
flatMap
的设计选择是否只在调用前并行化流进行了思考。========= 关于问题的更多澄清 ========
从一些答案中可以看出,我的问题没有完全表达清楚。正如@Andreas所说,如果我从3个元素的流开始,可能会有3个线程运行。
但是我的问题确实是:Java Stream使用一个公共的ForkJoinPool,默认大小等于核心数减1,根据这篇帖子。现在假设我有64个核心,那么我期望以上代码在
flatMap
之后会看到许多不同的线程,但实际上只看到了一个(或者在Andreas的情况下是三个)。顺便说一句,我确实使用了isParallel
来观察流是并行的。老实说,我提出这个问题并不是出于纯学术兴趣。我在一个项目中遇到了这个问题,该项目呈现了一长串用于转换数据集的流操作。该链以单个文件开始,并通过
flatMap
扩展为大量元素。但显然,在我的实验中,它没有充分利用我的计算机(它有64个核心),而只使用了一个核心(从观察cpu使用率)。
flatMap
这个东西到底有什么用?它只会让问题更加混淆。 - chrylis -cautiouslyoptimistic-isParallel
来查看一个你认为是并行还是顺序流的流实际上是什么。 - chrylis -cautiouslyoptimistic-