为什么Java Streams只能使用一次?

248

与C#的IEnumerable不同,在Java中,流(stream)只能被“迭代”一次,并且任何对终端操作的调用都会关闭该流,使其无法再次使用。这种“特性”削弱了其很多功能。

我想,这种限制背后的原因并不是技术性的。这种奇怪的限制背后有哪些设计考虑?

编辑:为了证明我所说的,请考虑在C#中实现快速排序的以下示例:

IEnumerable<int> QuickSort(IEnumerable<int> ints)
{
  if (!ints.Any()) {
    return Enumerable.Empty<int>();
  }

  int pivot = ints.First();

  IEnumerable<int> lt = ints.Where(i => i < pivot);
  IEnumerable<int> gt = ints.Where(i => i > pivot);

  return QuickSort(lt).Concat(new int[] { pivot }).Concat(QuickSort(gt));
}

请注意,我并不是在提倡这是一个好的快排实现!然而,它展示了lambda表达式和stream操作相结合的表现力。

而且在Java中无法完成!我甚至不能询问一个流是否为空,否则它就无法使用。


4
能否给出一个具体的例子,说明关闭流程如何“削弱权力”? - Rogério
23
如果您希望多次使用数据流中的数据,则需要将其转储到集合中。这基本上是必须的操作:要么您必须重新计算以生成数据流,要么您必须存储中间计算结果。 - Louis Wasserman
5
好的,但在相同的数据流上重新执行相同的计算听起来不太对。一个数据流是在进行计算之前从给定的来源创建的,就像迭代器在每次迭代时都会被创建一样。我仍然希望看到一个具体的实例;最终,我打赌使用一次性数据流可以找到一种清晰的方法来解决每个问题,假设在C#的可枚举对象中存在相应的方式。 - Rogério
2
一开始我感到很困惑,因为我认为这个问题会将C#的IEnumerablejava.io.*的流相关联。 - SpaceTrucker
9
请注意,C# 中多次使用 IEnumerable 是一种脆弱的编程模式,因此问题的前提可能存在一些缺陷。虽然许多 IEnumerable 的实现允许这样做,但有些则不允许!代码分析工具通常会警告你不要这样做。 - Sander
显示剩余5条评论
6个回答

387
我有一些关于Streams API早期设计的回忆,这可能会阐明设计理念。在2012年,我们正在为语言添加lambda表达式,并且我们想要使用lambda编程的面向集合或“批量数据”操作,以便促进并行性。到这个时候,惰性地将操作链接在一起的想法已经得到了很好的发展。我们也不希望中间操作存储结果。我们需要决定的主要问题是API中链中的对象长什么样子,以及它们如何与数据源连接。这些源通常是集合,但我们还希望支持来自文件或网络的数据,或者实时生成的数据,例如从随机数生成器中生成的数据。

设计受到了现有工作的许多影响,其中最具影响力的是Google的Guava库和Scala集合库。(如果有人对Guava的影响感到惊讶,请注意Kevin Bourrillion,Guava主要开发者,曾参与JSR-335 Lambda专家组。) 关于Scala集合库,我们发现Martin Odersky的这个演讲特别有意义: Future-Proofing Scala Collections: from Mutable to Persistent to Parallel。(Stanford EE380,2011年6月1日。)

我们当时的原型设计基于 Iterable。熟悉的操作,例如 filtermap 等等,默认都是扩展方法,并且在 Iterable 上实现。调用其中一个操作会将该操作添加到链中,并返回另一个 Iterable。像 count 这样的终端操作会向上遍历整个链,直到源头调用 iterator(),并在每个阶段的迭代器中实现这些操作。
由于这些对象是 Iterable,因此您可以多次调用 iterator() 方法。那么会发生什么呢?
如果源对象是一个集合,这通常运行良好。集合是可迭代的,每次调用 iterator() 都会生成一个不同的独立迭代器实例,每个迭代器都可以独立地遍历集合,非常棒。
现在,如果源是一次性的,比如从文件中读取行,该怎么办?也许第一个迭代器应该获取所有值,但第二个及以后的迭代器应该为空。也许这些值应该在迭代器之间交替。或者每个迭代器都应该获取相同的值。那么,如果你有两个迭代器,其中一个迭代器超前于另一个迭代器会发生什么?某人将不得不缓冲第二个迭代器中的值,直到它们被读取。更糟糕的是,如果你先获得一个迭代器并读取了所有的值,然后才获得第二个迭代器,那么现在的值来自哪里?是否需要将它们全部缓冲起来,以防万一有人想要第二个迭代器?
显然,允许对一次性源进行多次迭代引发了很多问题。我们没有好的答案。我们希望在调用iterator()两次时发生什么具有一致、可预测的行为。这促使我们禁止多次遍历,使管道成为一次性的。
我们还观察到其他人遇到了这些问题。在JDK中,大多数可迭代对象都是集合或类似于集合的对象,允许多次遍历。虽然没有任何地方指定,但似乎有一种未写明的期望,即可迭代对象允许多次遍历。一个值得注意的例外是NIO DirectoryStream接口。其规范包括以下有趣的警告:

尽管DirectoryStream扩展了Iterable,但它不是通用的Iterable,因为它仅支持单个Iterator;调用iterator方法以获取第二个或后续的iterator会抛出IllegalStateException。

[原文中加粗]

这似乎很不寻常和不愉快,我们不想创建一堆可能只能使用一次的新可迭代对象。这使我们远离使用Iterable。

大约在此时,Bruce Eckel的一篇文章描述了他在Scala中遇到的麻烦。他编写了这段代码:

// Scala
val lines = fromString(data).getLines
val registrants = lines.map(Registrant)
registrants.foreach(println)
registrants.foreach(println)

这很简单。它将文本行解析为Registrant对象并打印两次。但实际上它只打印了一次。原来他认为registrants是一个集合,而实际上它是一个迭代器。第二个对foreach的调用遇到了一个空的迭代器,其中所有值都已经被耗尽,因此没有打印任何内容。

这种经历让我们确信,如果尝试多次遍历,则具有清晰可预测的结果非常重要。它还突显了区分类似于流水线的惰性结构和存储数据的实际集合之间的重要性。这反过来推动了将惰性流操作分离到新的Stream接口中,并仅在Collections上直接进行急切的、可变的操作。Brian Goetz已经解释了这样做的原因。

那么允许基于集合的管道进行多次遍历,但不允许非基于集合的管道进行多次遍历呢?这是不一致的,但是是明智的。如果你从网络中读取值,当然你无法再次遍历它们。如果您想多次遍历它们,必须显式地将它们拉入集合中。

但是让我们探索允许从基于集合的管道进行多次遍历。假设你这样做:

Iterable<?> it = source.filter(...).map(...).filter(...).map(...);
it.into(dest1);
it.into(dest2);

如果源对象是一个集合,那么第一次使用into()方法会创建一个从源对象到目标对象的迭代器链,执行流水线操作,并将结果发送到目标对象。第二次使用into()方法将再次创建一个迭代器链,并再次执行流水线操作。这个过程看似没有问题,但实际上会导致过滤和映射操作对每个元素都进行两次。我认为很多程序员都会对这种行为感到惊讶。

正如我之前提到的,我们一直在与Guava开发人员交流。他们拥有一个很酷的Idea Graveyard,其中描述了他们决定实现的功能以及原因。懒加载集合的想法听起来很酷,但以下是他们对它的评价。考虑一个返回ListList.filter()操作:

这里最大的问题是太多操作变成了昂贵的线性时间命题。如果您想要过滤列表并返回列表,而不仅仅是 Collection 或 Iterable,可以使用 ImmutableList.copyOf(Iterables.filter(list, predicate)),它“提前声明”了它正在做什么以及它的代价。
以一个具体的例子来说,List 上的 get(0)size() 的成本是多少?对于像 ArrayList 这样常用的类,它们的复杂度是 O(1)。但是,如果您在一个懒惰过滤的列表上调用其中之一,它必须在后台列表上运行过滤器,所有这些操作都会变成 O(n)。更糟糕的是,它必须在每次操作时遍历后台列表。
这对我们来说似乎是过于懒惰了。设置一些操作并推迟实际执行直到您点击“Go”是一回事,以一种隐藏了潜在的大量重新计算的方式设置东西则是另一回事。
在建议禁止非线性或“不可重用”流时,Paul Sandoz描述了允许它们的潜在后果会导致“意外或混乱的结果”。他还提到并行执行会使事情变得更加棘手。最后,我想补充一点,如果具有副作用的管道操作意外多次执行,或者至少与程序员预期的次数不同,将导致困难和晦涩的错误。 (但是Java程序员不会编写带有副作用的lambda表达式,对吧?他们会吗?)

这就是Java 8 Streams API设计的基本原理,它允许一次性遍历,并要求严格线性(无分支)管道。它在多个不同的流源之间提供一致的行为,清楚地区分惰性和急切操作,并提供一个简单直观的执行模型。


关于IEnumerable,我对C#和.NET远非专家,因此如果我得出任何不正确的结论,我将感激得到(温柔地)纠正。然而,它似乎允许多次遍历在不同的源上表现不同; 它允许嵌套IEnumerable操作的分支结构,这可能导致一些重要的重新计算。虽然我欣赏不同的系统做出不同的权衡,但这是我们在设计Java 8 Streams API时试图避免的两个特征。
OP给出的快速排序示例很有趣,令人困惑,并且我很抱歉说,有些可怕。调用QuickSort需要一个IEnumerable并返回一个IEnumerable,因此直到最终的IEnumerable被遍历之前,实际上并没有进行任何排序。然而,调用似乎建立了一个反映快速排序所做分区的IEnumerable的树形结构,而不实际执行它。 (毕竟这是惰性计算)。如果源具有N个元素,则该树将在其最宽处为N个元素,在lg(N)个级别深处。
我觉得——再次强调,我不是C#或.NET专家——这将导致某些看似无害的调用变得更加昂贵,例如通过ints.First()选择枢轴。在第一层,当然是O(1)。但考虑到树中深处的分区,在右侧边缘。要计算此分区的第一个元素,必须遍历整个源,这是一个O(N)操作。但由于上面的分区是惰性的,它们必须重新计算,需要O(lg N)比较。因此,选择枢轴将是一个O(N lg N)操作,与整个排序一样昂贵。
但实际上,直到我们遍历返回的IEnumerable时才会进行排序。在标准快速排序算法中,每个分区级别使分区数量翻倍。每个分区只有一半大小,因此每个级别仍保持为O(N)复杂度。分区树高为O(lg N),因此总工作量为O(N lg N)。
使用惰性IEnumerables树,底部有N个分区。计算每个分区需要遍历N个元素,每个元素需要向上比较lg(N)次。因此,计算树底部的所有分区需要O(N ^ 2 lg N)次比较。
(这是正确的吗?我几乎无法相信这一点。请有人为我检查一下。)
无论如何,确实很酷,IEnumerable可以用这种方式构建复杂的计算结构。但是,如果它真的增加了计算复杂度,就像我认为的那样,似乎应该避免以这种方式编程,除非极其小心。

37
首先,感谢您提供了一份非常好的且不带贬低性质的回答!这是迄今为止我获得的最准确、最简洁的解释。就快速排序的例子而言,似乎您说得对,随着递归层数的增加,int型数组“First”变得越来越臃肿。我认为可以通过及时计算“gt”和“lt”(使用ToArray收集结果)来轻松解决这个问题。即便如此,这确实支持了您的观点,即这种编程风格可能会导致意外的性能代价。 - Vitaliy
19
另一方面,根据我使用C#的经验(超过5年),我可以说,一旦出现性能问题(或者更糟糕的是,如果有人引入了副作用),清除“冗余”计算并不那么困难。只是似乎在保证API纯粹性的同时做出了太多妥协,以牺牲类似C#的功能。你确实帮助我调整了我的观点。 - Vitaliy
7
谢谢您公正地交流想法,通过调查和撰写答案,我了解了一些关于C#和.NET的知识。 - Stuart Marks
12
小注释:ReSharper是一款帮助C#编程的Visual Studio扩展插件。对于上面的QuickSort代码,ReSharper会为每次使用ints添加一个警告:“可能对IEnumerable进行多次枚举”。多次使用同一个IEenumerable是可疑的,应该避免。我还想指出这个问题(我已经回答了),它展示了.Net方法的一些注意事项(除了性能差之外):List<T>和IEnumerable的区别 - Kobi
4
很有趣,在ReSharper中有这样的警告。感谢指向您的答案。我不懂C#/.NET,所以我必须仔细研究它,但它似乎表现出与我上面提到的设计问题类似的问题。 - Stuart Marks
显示剩余6条评论

125

背景

虽然问题看起来简单,但实际的答案需要一些背景知识才能理解。如果你想跳到结论部分,请向下滚动...

选择比较点 - 基本功能

使用基本概念,C#的IEnumerable概念更接近于Java的Iterable,它能够创建任意数量的IteratorsIEnumerables 创建 IEnumerators。Java的Iterable则创建Iterators

每个概念的历史相似,因为IEnumerableIterable都有一个基本的动机,就是允许在数据集合的成员上进行“for-each”风格的循环。虽然它们都不仅限于此,而且它们通过不同的进展到达了这个阶段,但这是一个重要的共同特点。

让我们比较该特性:在两种语言中,如果一个类实现了IEnumerable/Iterable,那么该类必须实现至少一个方法(对于C#来说,它是GetEnumerator,而对于Java来说,它是iterator())。在每种情况下,从其中返回的实例(IEnumerator/Iterator)允许您访问数据的当前成员和后续成员。这个特性被用于for-each语法。

选择比较点 - 增强功能

C#中的IEnumerable已经扩展到允许许多其他语言功能(主要与Linq相关)。添加的功能包括选择、投影、聚合等。这些扩展具有使用集合理论的强烈动机,类似于SQL和关系数据库概念。

Java 8也增加了一些功能,使得使用流和Lambda可以进行一定程度的函数式编程。注意,Java 8的流主要不是受到集合理论的驱动,而是受到函数式编程的驱动。不过,它们之间有很多相似之处。

这是第二点。对于C#所作的增强,实际上是在 IEnumerable 的概念基础上进行了改进。而在Java中,所作的增强则是通过创建 Lambdas 和 Streams 这两个新的基本概念来实现的,并且还创建了一种相对简单的方法来将 IteratorsIterables 转换为 Streams,反之亦然。

因此,将 IEnumerable 与 Java 的 Stream 概念进行比较是不完整的。你需要将其与 Java 中的组合 Streams 和 Collections API 进行比较。

In Java,Streams 与 Iterables 或 Iterators 不同

Streams 的设计并不是为了像 Iterators 一样解决问题:

  • Iterators 是描述数据序列的一种方式。
  • Streams 则是描述数据变换序列的一种方式。

使用 Iterator 时,你会获得一个数据值,处理它,然后获得另一个数据值。

而在使用 Streams 时,你将一系列函数链接在一起,然后将输入值提供给流,并从组合的序列中获取输出值。注意,在 Java 中,每个函数都封装在一个单独的 Stream 实例中。Streams API 允许你以一种链接转换表达式的方式链接 Stream 实例的序列。

为了完成 Stream 概念,你需要一个数据源来提供流的输入,并且需要一个终端函数来消耗流。

你将值提供到流中的方式实际上可能是来自一个 Iterable,但 Stream 序列本身并不是一个 Iterable,它是一个复合函数。

Stream 还被设计成是延迟执行的,也就是说,只有在请求从流中获取值时才会执行工作。

请注意这些 Streams 的重要假设和特点:

  • Stream 是 Java 中的一个转换引擎,它将数据项从一种状态转换为另一种状态。
  • Streams 没有数据顺序或位置的概念,它们只是简单地转换它们被要求转换的任何东西。
  • Streams 可以从许多源(包括其他 Streams、Iterators、Iterables、Collections)中获取数据。
  • 您不能“重置”一个 Stream,那就像“重新编程转换”。重置数据源可能是您想要的。
  • 逻辑上,在 Stream 中任何时候只有 1 个数据项“在路上”(除非 Stream 是并行 Stream,在这种情况下,每个线程有 1 个数项)。这与数据源无关,数据源可能具有大于当前条目“准备”供应到 Stream 或者 Stream collector 需要聚合和减少多个值的数据。
  • Streams 可以是无限制的,仅由数据源或 collector 限制(collector 也可以是无限制的)。
  • Streams 可以“链接”,过滤一个 Stream 的输出是另一个 Stream。输入到 Stream 并通过 Stream 转换的值可以转而提供给另一个执行不同转换的 Stream。数据以其转换后的状态从一个 Stream 流向下一个 Stream。您不需要干预并从一个 Stream 拉取数据并将其插入到下一个 Stream 中。

C# Comparison

当您考虑 Java Stream 只是供应、Stream 和 Collect 系统的一部分,并且 Stream 和 Iterators 经常与 Collections 一起使用时,就不难理解为什么很难将这些概念与在 C#中几乎全部嵌入到单个 IEnumerable 概念中的相同概念联系起来了。

IEnumerable 的某些部分(和密切相关的概念)在所有 Java Iterator、Iterable、Lambda 和 Stream 概念中都很明显。

Java 概念可以做一些在 IEnumerable 中更难实现的小事情,反之亦然。


结论

  • 这里没有设计问题,只是语言间概念匹配的问题。
  • Streams 通过不同的方法解决问题
  • Streams 添加了功能到 Java(它们添加了一种不同的做事方式,而不是减少了功能)

添加 Streams 在解决问题时给您更多的选择,这可以公正地分类为“增强能力”,而不是“减少”、“拿走”或者“限制”它。

为什么 Java Streams 只能使用一次?

这个问题是错误的,因为 Streams 是函数序列,而不是数据。根据馈送 Stream 的数据源,您可以重置数据源并馈送相同或不同的 Stream。

与 C#的 IEnumerable 不同,在 Java 中,Stream 只能“迭代”一次。

比较一个 IEnumerable 和一个 Stream 是不合适的。你用来表达 IEnumerable 可以无限次执行的上下文最好与 Java 的 Iterables 比较,后者可以被迭代多次。Java 的 Stream 表示 IEnumerable 概念的子集,而不是提供数据的子集,因此不能“重新运行”。

任何对终止操作的调用都会关闭流,使其无法使用。这种“特性”削弱了它很多的能力。

第一句话在某种意义上是正确的。"削弱能力"的说法并不准确。你仍在将 Streams 与 IEnumerables 进行比较。流中的终结操作就像 for 循环中的“break”语句。如果你需要,你总是可以自由地再创建一个流,并重新提供所需的数据。同样,如果你将 IEnumerable 视为类似于 Iterable,对于这个陈述,Java 做得非常好。

我想这种限制背后的设计考虑并不是技术上的。这个奇怪的限制背后有什么设计考虑?

原因是技术上的,因为 Stream 是你认为的一个子集。流子集不控制数据提供,因此你应该重置提供而不是流。在那种情况下,这并不奇怪。

快速排序示例

你的快速排序示例具有以下签名:

IEnumerable<int> QuickSort(IEnumerable<int> ints)

你正在将输入的IEnumerable视为数据源:

IEnumerable<int> lt = ints.Where(i => i < pivot);

此外,返回值也是 IEnumerable,它是一组数据集供应。由于这是一个排序操作,因此该供应的顺序非常重要。如果你认为 Java 中的 Iterable 类是这个操作的适当匹配,特别是 List 的可迭代专业化,因为 List 是具有保证顺序或迭代的数据集供应,那么相当于你代码的 Java 代码将是:

Stream<Integer> quickSort(List<Integer> ints) {
    // Using a stream to access the data, instead of the simpler ints.isEmpty()
    if (!ints.stream().findAny().isPresent()) {
        return Stream.of();
    }

    // treating the ints as a data collection, just like the C#
    final Integer pivot = ints.get(0);

    // Using streams to get the two partitions
    List<Integer> lt = ints.stream().filter(i -> i < pivot).collect(Collectors.toList());
    List<Integer> gt = ints.stream().filter(i -> i > pivot).collect(Collectors.toList());

    return Stream.concat(Stream.concat(quickSort(lt), Stream.of(pivot)),quickSort(gt));
}    

请注意,存在一个缺陷(我已经复制了该缺陷),排序不能优雅地处理重复值,它是“唯一值”排序。

还要注意Java代码在不同点使用数据源(List)和流概念,在C#中,这两个“个性”可以仅用IEnumerable表示。此外,虽然我使用List作为基本类型,但我可以使用更通用的Collection,并通过小的迭代器到流转换,我可以使用更通用的 Iterable


9
如果你想要“迭代”一个流,那你的做法是错误的。一个流代表了数据在转换链中某一时刻的状态。数据从流源头进入系统,然后从一个流传输到另一个流,在传输过程中不断改变状态,直到最终被收集、减少或丢弃。Stream是一个时间点的概念,而不是一个“循环操作”...(cont.) - rolfl
7
使用数据流,输入数据看起来像X,离开数据流后看起来像Y。有一个函数在数据流中执行该转换f(x)。数据流封装了这个函数,但不封装流经的数据。 - rolfl
4
IEnumerable 可以提供随机值,没有限制,并且可以在数据存在之前就被激活。 - Arturo Torres Sánchez
6
许多接收IEnumerable<T>参数的方法预期它表示一个有限集合,可以被多次迭代。一些可迭代的对象不符合这些条件,但仍实现了IEnumerable<T>接口,因为没有其他合适的标准接口可用。但是,如果给定了不符合这些条件的可迭代对象,那么期望可迭代多次的有限集合的方法可能会崩溃。 - supercat
5
如果您的 quickSort 示例返回一个 Stream,那么它会变得更加简单;这将节省两个 .stream() 调用和一个 .collect(Collectors.toList()) 调用。然后,如果您使用 Stream.of(pivot) 替换 Collections.singleton(pivot).stream(),代码几乎就能读懂了… - Holger
显示剩余10条评论

22

Stream 是围绕着有状态、可变的 Spliterator 对象构建的。它们没有“重置”操作,事实上,需要支持这样的重复操作会 “带走很多力量”。那么 Random.ints() 应该如何处理这样的请求呢?

另一方面,对于具有可追踪来源的 Stream,很容易构造等效的 Stream 以便再次使用。只需将构建 Stream 的步骤放入可重用的方法中即可。请记住,重复执行这些步骤不是昂贵的操作,因为所有这些步骤都是延迟操作;真正的工作从终端操作开始,并且根据实际终端操作执行完全不同的代码。

由您作为这种方法的编写者来指定调用该方法两次意味着什么:它是否产生与未修改的数组或集合创建的流完全相同的序列,还是生成具有类似语义但不同元素的流,例如随机整数流或控制台输入行流等。


顺便说一下,为了避免混淆,终端操作消耗Stream,这与调用流的 关闭close() 是不同的(对于具有关联资源的流例如由 Files.lines() 生成的流,这是必需的)。


似乎很多混淆都源自误导性地将 IEnumerableStream 进行比较。一个 IEnumerable 表示提供实际 IEnumerator 的能力,因此它类似于 Java 中的 Iterable。相比之下,Stream 是一种迭代器,并且可与 IEnumerator 相比,因此错误地声称这种数据类型可以在 .NET 中多次使用,对 IEnumerator.Reset 的支持是可选的。这里讨论的例子更多地利用了一个 IEnumerable 可以用于获取 新的 IEnumerator 的事实,并且 Java 的 Collection 也可以这样使用;您可以获得一个新的 Stream。如果 Java 开发人员决定直接将 Stream 操作添加到 Iterable 中,则返回另一个 Iterable 的中间操作非常可比,并且可以按照相同的方式工作。

然而,开发人员最终决定不这样做,并且这个决定已经在这个问题中讨论过。最大的争议点是急切的集合操作和惰性的流操作之间的混淆。通过查看.NET API,我个人认为这是合理的。尽管仅查看IEnumerable看起来很合理,但特定的集合将有许多直接操纵集合的方法和许多返回惰性IEnumerable的方法,而方法的特定性质并不总是直观可识别的。我找到的最糟糕的例子(就我花费的几分钟时间而言)是List.Reverse(),其名称与继承的名称完全匹配(这是扩展方法的正确术语吗?),Enumerable.Reverse(),但具有完全相反的行为。


当然,这些是两个不同的决策。第一个是使Stream成为与Iterable/Collection不同的类型,第二个是将Stream作为一种一次性迭代器而不是另一种可迭代类型。但是这些决策是同时做出的,也许从来没有考虑过将这两个决策分开。它并不是为了与.NET相比而创建的。

实际的API设计决策是添加了一个改进版的迭代器类型,即SpliteratorSpliterator可以由旧的Iterable(这是它们如何被改良的方式)或全新的实现提供。然后,Stream被添加为相对较低级别的Spliterator的高级前端。就是这样。您可以讨论是否存在更好的设计,但这并不具有生产力,鉴于它们现在的设计方式,它不会改变。

还有另一个实现方面需要考虑。 Stream不是不可变的数据结构。每个中间操作都可能返回一个封装旧Stream的新Stream实例,但它也可能直接操纵自己的实例并返回自身(这并不排除使用相同操作进行两者甚至更多次,例如parallelunordered操作不添加另一个步骤,而是操作整个管道)。拥有这样的可变数据结构并尝试重用(甚至更糟的是,在同一时间使用多次)会导致问题...


为了完整起见,以下是将您的快速排序示例转换为Java Stream API的示例。它表明它确实没有“削弱太多力量”。

static Stream<Integer> quickSort(Supplier<Stream<Integer>> ints) {

  final Optional<Integer> optPivot = ints.get().findAny();
  if(!optPivot.isPresent()) return Stream.empty();

  final int pivot = optPivot.get();

  Supplier<Stream<Integer>> lt = ()->ints.get().filter(i -> i < pivot);
  Supplier<Stream<Integer>> gt = ()->ints.get().filter(i -> i > pivot);

  return Stream.of(quickSort(lt), Stream.of(pivot), quickSort(gt)).flatMap(s->s);
}

它可以像这样使用

List<Integer> l=new Random().ints(100, 0, 1000).boxed().collect(Collectors.toList());
System.out.println(l);
System.out.println(quickSort(l::stream)
    .map(Object::toString).collect(Collectors.joining(", ")));

你甚至可以更加简洁地编写:

static Stream<Integer> quickSort(Supplier<Stream<Integer>> ints) {
    return ints.get().findAny().map(pivot ->
         Stream.of(
                   quickSort(()->ints.get().filter(i -> i < pivot)),
                   Stream.of(pivot),
                   quickSort(()->ints.get().filter(i -> i > pivot)))
        .flatMap(s->s)).orElse(Stream.empty());
}

1
无论是消耗还是没有消耗,尝试再次消耗都会抛出一个异常,指示流已经被关闭而不是被消耗。关于重置随机整数流的问题,正如您所说的,由库的编写者定义重置操作的确切合同。 - Vitaliy
2
不,消息是“流已经被操作或关闭”,我们并没有谈论“重置”操作,而是在Stream上调用两个或多个终端操作,而源Spliterator的重置将被隐含。我非常确定如果这是可能的,那么就会有关于“为什么在Stream上两次调用count()每次都给出不同结果”的问题等等在SO上提出... - Holger
1
count() 给出不同结果是完全有效的。count() 是对流的查询,如果流是可变的(或者更准确地说,流表示对可变集合的查询结果),那么这是可以预期的。看看 C# 的 API。他们优雅地处理了所有这些问题。 - Vitaliy
4
你所谓的“绝对有效”行为是一种反直觉的行为。毕竟,这是询问是否可以多次使用流以不同方式处理结果(预期相同)的主要动机。到目前为止,有关Stream不可重用性的每个在SO上的问题都源于试图通过多次调用最终操作(显然,否则你就不会注意到)来解决问题,如果Stream API允许这样做,则可能导致解决方案被默默地破坏并产生不同的结果。这里有一个很好的例子 - Holger
3
实际上,你的例子完美地说明了如果程序员不理解应用多个终端操作的影响会发生什么。想象一下,当每个操作将被应用于完全不同的元素集时会发生什么。只有在流的源返回每个查询相同的元素时才能正常工作,但这恰恰是我们谈论的错误假设。 - Holger
显示剩余2条评论

8
我认为当你仔细观察时,两者之间的差异非常少。
表面上看,IEnumerable似乎是一个可重复使用的结构:
IEnumerable<int> numbers = new int[] { 1, 2, 3, 4, 5 };

foreach (var n in numbers) {
    Console.WriteLine(n);
}

然而,编译器实际上会进行一些工作来帮助我们;它会生成以下代码:
IEnumerable<int> numbers = new int[] { 1, 2, 3, 4, 5 };

IEnumerator<int> enumerator = numbers.GetEnumerator();
while (enumerator.MoveNext()) {
    Console.WriteLine(enumerator.Current);
}

每次您遍历可枚举对象时,编译器会创建一个枚举器。枚举器不可重用;进一步调用“MoveNext”将只返回false,并且没有办法将其重置到开头。如果您想再次遍历数字,则需要创建另一个枚举器实例。
为了更好地说明IEnumerable具有与Java Stream相同的“特性”,考虑一个数字源不是静态集合的可枚举对象。例如,我们可以创建一个可枚举对象,它生成5个随机数的序列:
class Generator : IEnumerator<int> {
    Random _r;
    int _current;
    int _count = 0;

    public Generator(Random r) {
        _r = r;
    }

    public bool MoveNext() {
        _current= _r.Next();
        _count++;
        return _count <= 5;
    }

    public int Current {
        get { return _current; }
    }
 }

class RandomNumberStream : IEnumerable<int> {
    Random _r = new Random();
    public IEnumerator<int> GetEnumerator() {
        return new Generator(_r);
    }
    public IEnumerator IEnumerable.GetEnumerator() {
        return this.GetEnumerator();
    }
}

现在我们有了与之前基于数组的可枚举代码非常相似的代码,但是需要对 numbers 进行第二次迭代:

IEnumerable<int> numbers = new RandomNumberStream();

foreach (var n in numbers) {
    Console.WriteLine(n);
}
foreach (var n in numbers) {
    Console.WriteLine(n);
}

第二次迭代numbers时,我们将获得不同的数字序列,其不能以相同的方式重复使用。或者,我们可以编写RandomNumberStream以在尝试多次迭代时抛出异常,使可枚举实际上无法使用(类似于Java Stream)。
此外,当应用于RandomNumberStream时,基于可枚举对象的快速排序是什么意思?
结论
因此,最大的区别在于.NET允许您通过在需要访问序列中的元素时在后台隐式创建新的IEnumerator来重复使用IEnumerable
这种隐式行为通常很有用(正如您所说的“强大”),因为我们可以反复迭代集合。
但有时,这种隐式行为实际上可能会引起问题。如果您的数据源不是静态的或访问成本很高(例如数据库或网站),则必须放弃对IEnumerable的许多假设;重用不是那么简单。

2

在Stream API中,我们可以绕过一些“仅运行一次”保护措施;例如,我们可以通过引用和重复使用Spliterator(而不是直接使用Stream)来避免java.lang.IllegalStateException异常(带有消息“stream has already been operated upon or closed”)。

例如,以下代码将在不抛出异常的情况下运行:

    Spliterator<String> split = Stream.of("hello","world")
                                      .map(s->"prefix-"+s)
                                      .spliterator();

    Stream<String> replayable1 = StreamSupport.stream(split,false);
    Stream<String> replayable2 = StreamSupport.stream(split,false);


    replayable1.forEach(System.out::println);
    replayable2.forEach(System.out::println);

然而,输出将受到限制。
prefix-hello
prefix-world

我们应该避免重复输出两次。这是因为作为Stream源的ArraySpliterator是有状态的,并且存储其当前位置。当我们回放这个Stream时,我们会从结尾重新开始。

我们有几种选项来解决这个问题:

  1. We could make use of a stateless Stream creation method such as Stream#generate(). We would have to manage state externally in our own code and reset between Stream "replays":

    Spliterator<String> split = Stream.generate(this::nextValue)
                                      .map(s->"prefix-"+s)
                                      .spliterator();
    
    Stream<String> replayable1 = StreamSupport.stream(split,false);
    Stream<String> replayable2 = StreamSupport.stream(split,false);
    
    
    replayable1.forEach(System.out::println);
    this.resetCounter();
    replayable2.forEach(System.out::println);
    
  2. Another (slightly better but not perfect) solution to this is to write our own ArraySpliterator (or similar Stream source) that includes some capacity to reset the current counter. If we were to use it to generate the Stream we could potentially replay them successfully.

    MyArraySpliterator<String> arraySplit = new MyArraySpliterator("hello","world");
    Spliterator<String> split = StreamSupport.stream(arraySplit,false)
                                            .map(s->"prefix-"+s)
                                            .spliterator();
    
    Stream<String> replayable1 = StreamSupport.stream(split,false);
    Stream<String> replayable2 = StreamSupport.stream(split,false);
    
    
    replayable1.forEach(System.out::println);
    arraySplit.reset();
    replayable2.forEach(System.out::println);
    
  3. The best solution to this problem (in my opinion) is to make a new copy of any stateful Spliterators used in the Stream pipeline when new operators are invoked on the Stream. This is more complex and involved to implement, but if you don't mind using third party libraries, cyclops-react has a Stream implementation that does exactly this. (Disclosure: I am the lead developer for this project.)

    Stream<String> replayableStream = ReactiveSeq.of("hello","world")
                                                 .map(s->"prefix-"+s);
    
    
    
    
    replayableStream.forEach(System.out::println);
    replayableStream.forEach(System.out::println);
    
这将打印出来。
prefix-hello
prefix-world
prefix-hello
prefix-world

如预期所料。


0

原因在于您可以从一些仅能按定义使用一次的东西(例如Iterator或BufferedReader)创建流。您可以将Stream视为与使用BufferedReader读取文本文件到其末尾相同的方式被消耗。一旦到达文件结尾,BufferedReader并不会停止存在,但它变得无用,因为您不能再从中获取任何内容。如果您想再次读取文件,则必须创建新的读取器。流也是如此。如果要两次处理流的源,则必须创建两个单独的流。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接