flatMap是否保证是惰性的?

13

考虑以下代码:

urls.stream()
    .flatMap(url -> fetchDataFromInternet(url).stream())
    .filter(...)
    .findFirst()
    .get();

当第一个URL足够时,fetchDataFromInternet会被调用第二次吗?

我尝试了一个更小的示例,看起来像预期的工作方式。即逐个处理数据,但是这种行为可靠吗?如果不是,那么在.flatMap(...)之前调用.sequential()是否有帮助?

    Stream.of("one", "two", "three")
            .flatMap(num -> {
                System.out.println("Processing " + num);
                // return FetchFromInternetForNum(num).data().stream();
                return Stream.of(num);
            })
            .peek(num -> System.out.println("Peek before filter: "+ num))
            .filter(num -> num.length() > 0)
            .peek(num -> System.out.println("Peek after filter: "+ num))
            .forEach(num -> {
                System.out.println("Done " + num);
            });

输出:

Processing one
Peek before filter: one
Peek after filter: one
Done one
Processing two
Peek before filter: two
Peek after filter: two
Done two
Processing three
Peek before filter: three
Peek after filter: three
Done three

更新:如果实现方案很重要,使用官方Oracle JDK8。

答案: 根据下面的评论和答案,flatmap部分是惰性的。也就是说,它会完全读取第一个流,并且只有在需要时才进行下一个流的读取。读取一个流是急切的,但读取多个流是惰性的。

如果此行为是有意的,API应该允许函数返回Iterable而不是流。

换句话说:链接


2
关于并行性的文档指出:“当你创建一个流时,它总是串行流,除非另有说明。”因此,不需要调用.sequential() - teppic
你为什么认为它不是呢? - pedromss
@pedromss 文档没有明确说明。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#flatMap-java.util.function.Function- 看起来有一些情况可能不是惰性的:https://dev59.com/Bl4b5IYBdhLWcg3wYQmZ - balki
@balki,你链接的SO帖子在被接受的答案中指出,中间操作始终是惰性的。此外,根据文档:“流是惰性的;只有在启动终端操作时才对源数据进行计算,并且仅在需要时才消耗源元素。”Flatmap是一种中间操作。 - pedromss
谢谢!对我来说不是很明显。如果您添加答案,我会接受的。 - balki
3
fetchDataFromInternet 不会被多次调用,但是特定的 fetchDataFromInternet 调用返回的元素可能会在没有延迟处理的情况下被处理。 - Holger
3个回答

15

在当前实现下flatmap是急切的; 就像任何其他有状态的中间操作(比如sorteddistinct)。而且很容易证明:

 int result = Stream.of(1)
            .flatMap(x -> Stream.generate(() -> ThreadLocalRandom.current().nextInt()))
            .findFirst()
            .get();

    System.out.println(result);

由于flatMap是急切计算的,因此它永远不会结束。 对于您的示例:

urls.stream()
    .flatMap(url -> fetchDataFromInternet(url).stream())
    .filter(...)
    .findFirst()
    .get();

这意味着对于每个urlflatMap将阻止其后的所有其他操作,即使您只关心其中一个也是如此。因此,假设从单个url中生成了10_000行,那么您的findFirst将不得不等待计算所有这些行,即使您只关心其中一个。

编辑

这在Java 10中已经被修复,我们可以恢复我们的惰性特性:请参见JDK-8075939

编辑2

这在Java 8中也已经被修复(8u222):JDK-8225328


看起来它也已经被回溯到Java 8了。 - ZhekaKozlov
@ZhekaKozlov 感谢您提供的信息 - 如果您愿意,您也可以编辑答案。 - Eugene

5

不清楚您为什么设置了一个与您感兴趣的实际问题无关的示例。如果您想知道使用类似于findFirst()的短路操作时处理是否会懒惰,那么请使用使用findFirst()的示例,而不是处理所有元素的forEach。此外,将日志记录语句放入您想要跟踪的函数中:

Stream.of("hello", "world")
      .flatMap(s -> {
          System.out.println("flatMap function evaluated for \""+s+'"');
          return s.chars().boxed();
      })
      .peek(c -> System.out.printf("processing element %c%n", c))
      .filter(c -> c>'h')
      .findFirst()
      .ifPresent(c -> System.out.printf("found an %c%n", c));

flatMap function evaluated for "hello"
processing element h
processing element e
processing element l
processing element l
processing element o
found an l

这表明传递给 flatMap 的函数按预期惰性地评估,而返回的(子)流的元素不会尽可能懒惰地评估,正如您已经在链接的问题和答案中讨论的那样。

因此,关于你的fetchDataFromInternet方法,该方法被从传递给flatMap的函数调用,你将得到所需的惰性。但是它返回的数据却不会是惰性的。


1
今天我也遇到了这个bug。行为并不是那么直接,因为简单的情况(如下所示)可以正常工作,但类似的生产代码却无法正常工作。
 stream(spliterator).map(o -> o).flatMap(Stream::of)..flatMap(Stream::of).findAny()

对于不能再等几年迁移到JDK-10的人来说,有一个替代品是真正的懒惰流。它不支持并行。它专门为JavaScript翻译而设计,但对我来说也适用,因为接口相同。
StreamHelper基于集合,但很容易适应Spliterator。

https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/StreamHelper.java


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接