Stream<Stream>: flatMap vs. reduce

11
如果我执行以下代码将两个流“连接”起来:
  • 首先使用 flatMap 对Stream<Stream<Integer>>进行操作
  • 然后使用 Stream.concat() 缩减一个Stream<Stream<Integer>>
在这两种情况下,我都可以得到相同的正确结果,但过滤操作的数量不同。
public class FlatMapVsReduce {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

        Predicate<Integer> predicate1 = i -> {
            System.out.println("testing first condition with " + i);
            return i == 3;
        };

        Predicate<Integer> predicate2 = i -> {
            System.out.println("testing second condition with " + i);
            return i == 7;
        };

        System.out.println("Testing with flatMap");
        Integer result1 =
            Stream.of(list.stream().filter(predicate1),
                      list.stream().filter(predicate2))
                  .flatMap(Function.identity())
                  .peek(i -> System.out.println("peeking " + i))
                  .findFirst()
                  .orElse(null);
        System.out.println("result1 = " + result1);

        System.out.println();
        System.out.println("Testing with reduce");
        Integer result2 =
            Stream.of(list.stream().filter(predicate1),
                      list.stream().filter(predicate2))
                  .reduce(Stream::concat)
                  .orElseGet(Stream::empty)
                  .peek(i -> System.out.println("peeking " + i))
                  .findFirst()
                  .orElse(null);
        System.out.println("result2 = " + result2);
    }
}

在这两种情况下,我都得到了预期的结果(3)。然而,第一次操作将第一个过滤器应用于集合中的每个元素,而第二次操作则会在遇到一个元素后停止。输出结果为:

Testing with flatMap
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
testing first condition with 4
testing first condition with 5
testing first condition with 6
testing first condition with 7
testing first condition with 8
testing first condition with 9
result1 = 3

Testing with reduce
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
result2 = 3

为什么这两种情况的行为会有差异?JDK代码是否可以改进,使得第一种情况与第二种情况一样高效,或者是flatMap中的某些因素使其不可能实现?

补充说明:以下替代方案与使用reduce的方案一样有效,但我仍然无法解释原因:

    Integer result3 = Stream.of(predicate1, predicate2)
                            .flatMap(c -> list.stream().filter(c).limit(1))
                            .peek(i -> System.out.println("peeking " + i))
                            .findFirst()
                            .orElse(null);
    System.out.println("result3 = " + result3);

3
是的,因为reduce()返回的是Optional<Stream<Integer>>,而不是Stream<Integer>。 - JB Nizet
第一个无限流会发生什么? - user2357112
1
挖掘源代码似乎表明flatMap调用forEach来完全消耗每个子流,出于某种原因。我在浏览代码方面很糟糕,所以我不确定它为什么这样做,或者我是否正确阅读了它。 - user2357112
2
请查看以下链接:“为什么在Java流中,flatMap()后的filter()不完全是惰性的?”(https://dev59.com/Bl4b5IYBdhLWcg3wYQmZ) - Holger
1
请注意,这里有一个概念验证,证明了惰性flatmap是可能的... - Holger
显示剩余6条评论
1个回答

5
openJDK中的flatMap实现来看,我理解flatMap会将整个输入流内容向下推送:
result.sequential().forEach(downstreamAsInt);

另一方面,Stream::concat 似乎处理的是拉模式并且不会一次性发送所有内容。

我怀疑您的测试没有展示全部情况:

  • flatMap 中,只有当第一个流用尽时才考虑第二个流。
  • reduce 中,所有流都被推送到最终连接的流中,因为在消耗输入流的所有内容之前,减少的对象没有意义。

这意味着使用其中任何一个取决于输入的复杂程度。如果您有一个无限的 Stream<Stream<Integer>>,reduce 永远不会结束。


难道不是相反吗?flatMap 不会完成吗? - Yassin Hajaj
4
如果我有一个由无限个整数流组成的有限流,flatMap 永远不会结束。如果我有一个由有限个整数流组成的无限流,reduce 永远不会结束。 - JB Nizet
@JBNizet 当然,因为它无法减少无限数量的流。谢谢。 - Yassin Hajaj

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接