流分割器的实现细节

6

在查看WrappingSpliterator :: trySplit的源代码时,我对其实现非常疑惑:

    @Override
    public Spliterator<P_OUT> trySplit() {
        if (isParallel && buffer == null && !finished) {
            init();

            Spliterator<P_IN> split = spliterator.trySplit();
            return (split == null) ? null : wrap(split);
        }
        else
            return null;
    }

如果您想知道这为何重要,那是因为例如这个示例:
Arrays.asList(1,2,3,4,5)
      .stream()
      .filter(x -> x != 1)
      .spliterator();

正在使用它。据我理解,将任何中间操作添加到流中都会触发该代码的执行。

基本上,这个方法表示除非流是并行的,否则将此Spliterator视为无法分割的Spliterator。这对我很重要。在我的一个方法中(这就是我找到这段代码的方法),我以Stream作为输入,并手动将其“解析”成较小的片段,使用trySplit。例如,您可以认为我正在尝试从Stream中查找最后一个。

这就是我的希望将其分成更小块的愿望被摧毁的地方,因为一旦我这样做:

Spliterator<T> sp = stream.spliterator();
Spliterator<T> prefixSplit = sp.trySplit();

我发现prefixSplitnull,这意味着除了使用forEachRemaning消耗整个sp之外,我基本上不能做其他任何事情。这有点奇怪,也许对于存在filter的情况,可能会有一些意义;因为在这种情况下,我理解中唯一返回Spliterator的方式是使用某种类型的缓冲区,甚至可能是预定义大小(就像Files::lines一样)。但为什么要这样呢:
Arrays.asList(1,2,3,4)
      .stream()
      .sorted()
      .spliterator()
      .trySplit();

返回 null 是我不理解的内容。 sorted 是一个有状态的操作,它在缓冲元素的同时,没有实际减少或增加它们的初始数量,因此至少在理论上,它可以返回其他结果而不是 null...


2
嗯...也许是因为流不是并行的?这是一个显而易见的问题...你尝试过Arrays.asList(1,2,3,4).parallelStream()......吗? - fps
1
当然可以... :) 但是如果你移除 filter 并且流不是并行的,那么分割就会起作用。文档并没有说这必须发生在并行流中。 - Eugene
从javadoc中:
  • <p>此方法可能由于任何原因返回{@code null},
  • 包括空值,无法在遍历后分割,数据结构限制和效率
  • 考虑。
- Wisthler
3
如果您不链式调用中间操作,spliterator() 方法将返回源 Spliterator,例如 Arrays.asList(1,2,3,4,5) .stream() .spliterator() .getClass() == Arrays.spliterator(new Integer[] { 1,2,3,4,5 }) .getClass()。这样的 Spliterator 甚至不知道 Stream 是否是并行的,或者是否存在 Stream。而且,请注意,Arrays.asList(1,2,3,4,5) .parallelStream() .filter(x -> x != 1) .spliterator(); 不需要任何缓冲。 - Holger
3
@Holger 对,如果没有中间操作,源Spliterator就会被返回,甚至 WrappingSpliterator 也可以通过 spliteratorSupplier 获取到它。第二点是,我认为它可能是使用缓冲区实现的,现在我查看了实现,你是对的,它会分割源Spliterator……那么在这种情况下问题就是这是否是序列流的有意决定?你知道这样做的原因吗?谢谢 - Eugene
显示剩余2条评论
1个回答

1
当您在Stream上调用spliterator()时,根据当前实现,只有两种可能的结果。
如果流没有中间操作,则会获得用于构造流的源分割器,其分割能力完全独立于流的并行状态,实际上,分割器对流一无所知。
否则,您将获得一个WrappingSpliterator,它将封装源Spliterator和管道状态,表示为PipelineHelper。这个SpliteratorPipelineHelper的组合不需要并行工作,事实上,在distinct()的情况下将无法工作,因为WrappingSpliterator将根据流是并行还是非并行而获得完全不同的组合。

对于无状态的中间操作,忽略并行标志不会有影响。但是,正如在“为什么stream.spliterator()的tryAdvance可能会将项目累积到缓冲区?”中讨论的那样,WrappingSpliterator是一个“一刀切”的实现,它不考虑管道的实际性质,所以其限制是所有支持的管道阶段可能存在的限制的超集。因此,只要存在一个场景在忽略parallel标志时无法工作,就足以禁止在非parallel情况下分割所有流水线。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接