如何懒惰地评估嵌套的flatMap。

14

我试图从两个可能无限的流中召唤笛卡尔积,然后通过limit()进行限制。

到目前为止,这大致是我的策略:

@Test
void flatMapIsLazy() {
        Stream.of("a", "b", "c")
            .flatMap(s -> Stream.of("x", "y")
                .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                    .mapToObj(sd::repeat)))
            .map(s -> s + "u")
            .limit(20)
            .forEach(System.out::println);
}

这个不起作用。

显然,我的第二个流在管道中第一次使用时就被终止评估了。它没有产生一个我可以按自己的节奏消耗的惰性流。

我认为这段代码中的 .forEach 是有问题的:ReferencePipeline#flatMap

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
               result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}

我预期上述代码会返回20个元素,看起来像:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx

但是,实际上代码会因为一个OutOfMemoryError而崩溃,因为嵌套的flatMap中非常长的Stream被急切地(??)评估,并用重复的字符串填满了我的内存,导致不必要的副本。如果提供的是3而不是Integer.MAX_VALUE,并保持相同的限制为20,则预期的输出将会是:

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)

编辑:目前我已经使用惰性迭代器自己实现了代码。不过,我认为应该有一种方法可以纯粹采用Streams来实现。

编辑2:这已经被认为是Java中的一个错误,并在https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20 上作为一个错误票据被提交。


1
流的大小不提,你试过只运行一次 "x".repeat(Integer.MAX_VALUE) 吗?在我的机器上,我得到了一个 OOM。也许这只是你这里的一个坏例子,但你不能指望它能工作。 - ernest_k
除此之外,.flatMap(s -> second) 无法工作。您正在尝试重用流。这几乎肯定会导致 IllegalStateException - ernest_k
1
一种更符合您原始查询的代码版本可能是:Stream.of("a", "b", "c").flatMap(s -> Stream.of("x", "y").flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE).mapToObj(sd::repeat))).map(s -> s + "u").limit(20).forEach(System.out::println);,但这将导致内存溢出。请注意它包含嵌套的flatMap调用。 - ernest_k
1
@ernest_k 是的,就是这样。我已经修改了问题代码!谢谢! :) - Nirro
1
我假设您会提供一些值(例如整数限制方法)来控制输出的大小。看到几个这样的值的预期输出将是有用的。 - WJS
显示剩余6条评论
1个回答

5

正如你所写的,这已经被认为是一个bug。也许,在未来版本的Java中会解决这个问题。

但是现在可能有一个解决方案。它并不是非常优雅,而且只有在外部流的元素数量和限制都足够小的情况下才可行。但在这些限制下它可以工作。

首先让我稍微修改一下你的示例,将外部的 flatMap 拆分成两个操作(一个 map 和一个 flatMap,只做展平):

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s)
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

我们可以清楚地看到,我们只需要每个内部流中的不超过20个元素。因此,我们可以将每个流限制为这个元素数量。这样就可以实现(你应该使用一个变量或者常量来表示这个限制):
Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s.limit(20))            // limit each inner stream
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

当然,这仍然会产生太多的中间结果,但在上述限制条件下可能不是一个很大的问题。


很好!在我的真实测试代码中,它不起作用,因为我在非常不同的位置和堆栈深度上使用了flatMaplimit,因为内部和外部流是在不同的类中生成的。也许我可以通过调用使限制上下移动,但这有点违背了初衷。无论如何,感谢您的时间和回答。 - Nirro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接