我试图从两个可能无限的流中召唤笛卡尔积,然后通过limit()
进行限制。
到目前为止,这大致是我的策略:
@Test
void flatMapIsLazy() {
Stream.of("a", "b", "c")
.flatMap(s -> Stream.of("x", "y")
.flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);
}
这个不起作用。
显然,我的第二个流在管道中第一次使用时就被终止评估了。它没有产生一个我可以按自己的节奏消耗的惰性流。
我认为这段代码中的 .forEach
是有问题的:ReferencePipeline#flatMap
。
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}
我预期上述代码会返回20个元素,看起来像:
a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx
但是,实际上代码会因为一个OutOfMemoryError
而崩溃,因为嵌套的flatMap
中非常长的Stream
被急切地(??)评估,并用重复的字符串填满了我的内存,导致不必要的副本。如果提供的是3而不是Integer.MAX_VALUE
,并保持相同的限制为20,则预期的输出将会是:
a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)
编辑:目前我已经使用惰性迭代器自己实现了代码。不过,我认为应该有一种方法可以纯粹采用Streams来实现。
编辑2:这已经被认为是Java中的一个错误,并在https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20 上作为一个错误票据被提交。
"x".repeat(Integer.MAX_VALUE)
吗?在我的机器上,我得到了一个 OOM。也许这只是你这里的一个坏例子,但你不能指望它能工作。 - ernest_k.flatMap(s -> second)
无法工作。您正在尝试重用流。这几乎肯定会导致IllegalStateException
。 - ernest_kStream.of("a", "b", "c").flatMap(s -> Stream.of("x", "y").flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE).mapToObj(sd::repeat))).map(s -> s + "u").limit(20).forEach(System.out::println);
,但这将导致内存溢出。请注意它包含嵌套的flatMap调用。 - ernest_k