使用无序终止操作时的Stream.skip行为

36

我已经阅读了这个这个问题,但仍然怀疑JDK作者是否有意让Stream.skip表现出观察到的行为。

让我们使用数字1..20进行简单输入:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

现在让我们创建一个并行流,在不同的方式中使用unordered()skip()结合起来,并收集结果:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));

这里的过滤步骤基本上没有起到任何作用,但增加了流引擎的难度:现在它不知道输出的确切大小,因此一些优化被关闭。我有以下结果:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18
结果完全正常,一切都按预期工作。在第一种情况下,我要求跳过前两个元素,然后无序地收集到列表中。在第二种情况下,我要求跳过第一个元素,然后变成无序的并跳过另一个元素(我不关心哪一个)。在第三种情况下,我先变成无序模式,然后跳过两个任意元素。 让我们跳过一个元素,并以无序模式收集到自定义集合中。我们的自定义集合将是HashSet:
System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));
输出结果令人满意:
skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped

一般来说,只要流是有序的,skip() 就会跳过前面的元素,否则它会随机跳过一些元素。

然而,让我们使用一个等价的无序终端操作 collect(Collectors.toSet())

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));

现在的输出结果为:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped

使用任何其他无序的终端操作(如forEachfindAnyanyMatch等)都可以达到相同的结果。在这种情况下,删除unordered()步骤不会改变任何内容。似乎unordered()步骤正确地使流从当前操作开始无序,而无序终端操作使整个流从最开始就变得无序,尽管如果使用了skip(),这可能会影响结果。对我来说,这似乎完全是误导性的:我期望使用无序收集器与将流转换为无序模式 就在终端操作之前,并使用等效的有序收集器是相同的。

所以我的问题是:

  1. 这种行为是有意为之还是bug?
  2. 如果是,它是否有文档记录?我已阅读Stream.skip()文档:它没有关于无序终端操作的任何说明。此外Characteristics.UNORDERED 文档不太容易理解,也没有说整个流将失去排序。最后,在包概述中的Ordering部分也没有涵盖这种情况。也许我漏掉了什么?
  3. 如果无序终端操作使整个流变得无序,那么为什么unordered()步骤只在此点之后使其无序?我可以依赖这种行为吗?还是我只是运气好,我的第一次测试很顺利?

1
但问题在于 - 没有所谓的“之前”。所有先前的操作都是中间操作,只有当您执行终端操作 - 在这种情况下是收集操作时,流缩减才会发生。 - mikołak
2
正如我之前在这里所说的那样,如果行为是一致的,那么它就更容易理解。这仍然允许在该问题中显示的行为是有意的,但是我们可能认为保留顺序的频率过高是一个错误。你知道,sorted().forEach()不应该排序。 - Holger
1
你的初始代码是否缺少 boxed() 调用?我不能这样 collect(),需要 boxed() - Thomas Weller
1
@Thomas,谢谢,boxed()已添加。.parallelStream().filter(x -> x > 0)是必要的,因为我想揭示问题,而不是消除它们 :-) 当然,这只是一个人工简化的例子。在实践中,如果您使用例如bufferedReader.lines().skip(1).parallel().forEach(...),可能会出现此类问题。请参见链接的问题。 - Tagir Valeev
1
@FedericoPeraltaSchaffner,如果你需要解析带有标题行的文本文件,并且需要高效处理,那么lines.stream().skip(1).parallel().blahblah可能对你很有效。 - Tagir Valeev
显示剩余10条评论
2个回答

30

请记住,流标志(ORDERED、SORTED、SIZED、DISTINCT)的目的是使操作避免执行不必要的工作。涉及流标志的优化示例包括:

  • 如果我们知道流已经排序,则 sorted() 不执行任何操作;
  • 如果我们知道流的大小,我们可以在 toArray() 中预先分配正确大小的数组,避免复制;
  • 如果我们知道输入没有有意义的相遇顺序,则无需采取额外措施来保留相遇顺序。

管道的每个阶段都有一组流标志。中间操作可以注入、保留或清除流标志。例如,过滤保留排序/去重标志,但不保留大小标志;映射保留大小标志,但不保留排序/去重标志。排序注入排序标志。中间操作的标志处理相当直接,因为所有决策都是本地的。

终端操作的标志处理更加微妙。对于终端操作,ORDERED 是最相关的标志。如果终端操作是 UNORDERED,则我们会向后传播无序性。

为什么这样做呢?考虑以下管道:

set.stream()
   .sorted()
   .forEach(System.out::println);

由于forEach不受顺序限制,对列表进行排序的工作完全是浪费的。因此,我们向后传递这个信息(直到遇到一个短路操作,例如limit),以便不失去这个优化机会。同样,我们可以在无序流上使用优化的distinct实现。

这种行为是有意为之还是一个bug?

是的 :) 后向传播是有意为之的,因为它是一种有用的优化,不应产生错误的结果。但是,bug的部分是我们正在传播先前的skip,而我们不应该这样做。因此,UNORDERED标志的后向传播过于激进,这是一个bug。我们将发布一个bug。

如果是的话,它在哪里有记录?

它应该只是一个实现细节;如果正确实现,你不会注意到它(除了你的流会更快)。


2
谢谢!这正是我一直在等待的答案 :-) 我已经在我的库中实现了一个skipOrdered方法来解决这个问题。它接受流分割器,将其转换为顺序流,执行skip(),然后如果需要,将其转回parallel()。希望原始的skip()在JDK9中得到修复,这个方法就不再需要了。 - Tagir Valeev
11
经过一些分析,我们决定完全放弃反向传播(back-propagation)。这样做只有在优化排序时才会有所好处;如果你有一个将数据排序后再输送到无序终端操作的流水线(pipeline),那么这很可能是用户错误。 - Brian Goetz
3
@Brian Goetz:只是想确保理解正确,终端操作不再向后传递无序属性了吗?那是否意味着在这方面forEachforEachOrdered之间不再有差别? - Holger
5
@Holger 正确,现在不再向后传递终端标志,这意味着终端操作的有序或无序不会影响之前操作的行为。当然,forEachforEachOrdered 之间仍然有区别。 - Brian Goetz

1

@Ruben,你可能不理解我的问题。大致上问题是:为什么unordered().collect(toCollection(HashSet::new))的行为与collect(toSet())不同。当然,我知道toSet()是无序的。

也许是这样,但无论如何,我会再试一次。

查看Collectors toSet和toCollection的Javadocs,我们可以看到toSet提供了一个无序收集器

这是一个{@link Collector.Characteristics#UNORDERED unordered}收集器。

即,一个带有UNORDERED特征的CollectorImpl。查看Collector.Characteristics#UNORDERED的Javadoc,我们可以读到:

表示集合操作不承诺保留输入元素的遭遇顺序

在Collector的Javadocs中,我们还可以看到:

对于并发收集器,实现可以自由地(但不是必须)并发实现归约。并发归约是指累加器函数从多个线程同时调用,使用相同的可并发修改的结果容器,而不是在累加过程中保持结果隔离。只有当收集器具有{@link Characteristics#UNORDERED}特征或源数据无序时,才应该应用并发归约。
这意味着如果我们设置了UNORDERED特征,则完全不关心流中元素传递给累加器的顺序,因此可以以任何顺序从管道中提取元素。
顺便说一下,在您的示例中省略unordered()会得到相同的行为:
    System.out.println("skip-toSet: "
            + input.parallelStream().filter(x -> x > 0)
                .skip(1)
                .collect(Collectors.toSet()));

此外,流中的skip()方法给了我们一个提示:
虽然skip()在顺序流管道上通常是一项廉价操作,但在有序并行流管道上可能会非常昂贵。
并且
使用无序流源(例如generate(Supplier))或使用unordered()移除排序约束可能会导致显著加速。
当使用时,
Collectors.toCollection(HashSet::new)

您正在创建一个普通的“有序”收集器(没有无序特征),这意味着您关心排序,因此元素按顺序提取,您会得到预期的行为。

感谢关注我的问题,但这并没有回答我的问题。 "对于并发收集器"部分是不相关的,因为没有任何收集器具有CONCURRENT特性。我知道toSet是无序的,所以它将终端操作转换为无序模式,我在问题中提到了这一点。我还提到删除unordered()不会改变任何内容,因此我知道当我省略unordered()时的相同行为。我不谈论性能,只谈论正确性,因此skip()是否便宜是超出问题范围的。 - Tagir Valeev
最后一句引用提到了“无序流源”或unordered()中间操作。这些都运行得非常好。它并没有提到我遇到问题的无序终端操作。当然,我知道Collectors.toCollection(HashSet::new)是有序收集器。 - Tagir Valeev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接