Stream.parallel() 方法会更新 spliterator 的特征吗?

6
这个问题基于这个问题的回答 What is the difference between Stream.of and IntStream.range? 由于IntStream.range生成的是一个已经排序好的流,下面代码的输出仅为0
IntStream.range(0, 4)
         .peek(System.out::println)
         .sorted()
         .findFirst();

此外,分隔器将具有“已排序”特性。以下代码返回“true”:
System.out.println(
    IntStream.range(0, 4)
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

现在,如果我在第一个代码中引入parallel(),那么输出将包含从03的所有4个数字,但顺序是随机的,因为由于parallel(),流不再排序。
IntStream.range(0, 4)
         .parallel()
         .peek(System.out::println)
         .sorted()
         .findFirst();

这将产生类似于以下内容的结果:(以任意随机顺序)
2
0
1
3

因此,我认为由于使用parallel()SORTED属性已被删除。但是,下面的代码也会返回true
System.out.println(
    IntStream.range(0, 4)
             .parallel()
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

为什么 parallel() 方法不会改变 SORTED 属性?并且由于所有的四个数字都被打印出来了,即使 SORTED 属性仍然存在,Java 如何意识到流不是已排序的?

1
不是特征,它们对于您的代码块都是相同的。您可以使用spliterator.characteristics()进行验证。但是,是的,我们仍然对主要问题感兴趣,即获得一个回答来完整解释“什么”,但是也包括“为什么”和“如何”。在我看来,现有的两个答案都不够。 - Naman
1
@Naman 请告诉我你对这个的看法? - Eugene
2个回答

5

如何确切地实现这一点很大程度上取决于具体情况,你需要深入源代码中去探究原因。基本上,串行和并行流水线的处理方式是不同的。查看AbstractPipeline.evaluate,检查isParallel(),然后根据流水线是否为并行来执行不同的操作。

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

如果您查看SortedOps.OfInt,您将看到它重写了两个方法:

@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
    Objects.requireNonNull(sink);

    if (StreamOpFlag.SORTED.isKnown(flags))
        return sink;
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedIntSortingSink(sink);
    else
        return new IntSortingSink(sink);
}

@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                               Spliterator<P_IN> spliterator,
                                               IntFunction<Integer[]> generator) {
    if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
    }
    else {
        Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

        int[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
    }
}

如果是顺序流,则最终将调用opWrapSink,而如果是并行流,则会调用opEvaluateParallel(顾名思义)。请注意,如果管道已经排序,opWrapSink不会对给定的sink执行任何操作(仅以原样返回),但opEvaluateParallel始终会评估分裂器。
另请注意,并行性和排序性不是互斥的。您可以拥有具有这些特征组合的流。
“排序”是Spliterator的特征。它在技术上不是Stream的特征(像“parallel”一样)。当然,parallel可以创建具有全新特征的全新分裂器流(从原始分裂器获取元素),但为什么要这样做呢?您无论如何都必须单独处理并行和顺序流。

我只是在调试答案的其余部分 :-),但我想在答案的最后一部分得到澄清。如果我们链接并行和顺序操作,那么操作模式是否对应于最后一个?即,如果最后指定了顺序,则流将是顺序的,即使最初流是并行的,甚至在我们之间使用了 parallel() 也是如此? - Gautham M
1
@GauthamM 我尝试通过另一种方式解释为什么“sorted”没有重置。请参见编辑。 - Sweeper
所以这是我理解的。首先,实际上是流操作标志使得差异,而不是分割器特性。 其次,如果顺序管道也总是评估即使存在SORTED标志,那么输出将是0123(有序)而不是0。这正确吗? - Gautham M
我也没有说“即使是排序过的,连续计算仍然会进行...”,我的意思是想问->如果它总是被计算(而不是当前的实现),那么输出会是0123吗? - Gautham M
1
@GauthamM 哦,我现在明白你的意思了 :) 是的,没错。这就是使用 IntStream.of(1,2,3,4)(没有排序特性)会做到的。 - Sweeper
显示剩余3条评论

4
你需要退一步,考虑如何解决这样一个问题,考虑到ForkJoinPool用于并行流,并且它基于工作窃取的原理。如果你了解Spliterator的工作原理,那将非常有帮助。有关详细信息在此处
你有一个特定的流,你将其“分割”(非常简化)成小块,并将所有这些块交给ForkJoinPool执行。所有这些块都由单独的线程独立地处理。由于我们在这里谈论的是线程,显然没有事件序列,事情是随机发生的(这就是为什么你看到随机顺序输出的原因)。
如果您的流保留了顺序,那么终端操作也应该保留它。因此,虽然中间操作可以以任何顺序执行,但是如果在终端操作之前的流是有序的,则终端操作将按顺序处理元素。简单来说:
System.out.println(
    IntStream.of(1,2,3)
             .parallel()
             .map(x -> {System.out.println(x * 2); return x * 2;})
             .boxed()
             .collect(Collectors.toList()));

map会按照未知的顺序处理元素(请记住ForkJoinPool和线程),但是collect会按照“从左到右”的顺序接收元素。


现在,如果我们将其推广到您的示例:当您调用parallel时,流被分成小块并进行处理。例如,看看这个是如何分割的(仅一次)。
Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
         .parallel()
         .boxed()
         .sorted()
         .spliterator()
         .trySplit(); // trySplit is invoked internally on parallel

spliterator.forEachRemaining(System.out::println);

在我的机器上它会打印出1,2,3,4。这意味着内部实现将流分成了两个Spliterator: leftrightleft[1, 2, 3, 4],而right[5, 6, 7, 8]。但这还不是全部,这些Spliterator可以进一步分割。例如:
Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
         .parallel()
         .boxed()
         .sorted()
         .spliterator()
         .trySplit()
         .trySplit()
         .trySplit();

spliterator.forEachRemaining(System.out::println);

如果您尝试再次调用trySplit,您将得到一个null - 这意味着,那就是它,我无法再分割了。

因此,您的流:IntStream.range(0, 4)将被拆分为4个Spliterator。每个线程都会单独处理一个Spliterator。如果您的第一个线程知道它当前正在处理的这个Spliterator是“最左边”的一个,那就是它!其余的线程甚至不需要开始他们的工作-结果已知。

另一方面,可能是这个具有“最左边”元素的Spliterator是最后一个启动的。因此,前三个可能已经完成了他们的工作(因此在您的示例中调用了peek),但他们没有“产生”所需的结果。

事实上,这就是内部完成的方式。您不需要理解代码-但流程和方法名称应该很明显。


谢谢。我想抽出单独的时间来阅读这个内容。这是我理解的(假设有4个线程)-> range(0,9).parallel().peek(print).sorted().findFirst()。在这里,由于我们使用了sorted()parallel(), findFirst会等待所有4个线程完成。因此,即使处理0线程的线程首先完成,输出始终是0到8的随机顺序。 (接下来的内容请看我的下一条评论) - Gautham M
接下来,我将通过在sorted之后添加类似的“peek”来更改代码。range(0,9).parallel().peek(print).sorted().peek(print).findFirst()。这里,在第二个“peek”的一部分,即使处理带有0的线程,其余的3个线程也会继续它们的操作(假设它们分别为4、7、1),因此也会打印出“4,7,1”。但是,没有新元素会被任何线程选中进行处理。因此,至少会始终打印4(线程数)个数字。为什么一旦找到结果,其他线程就不立即被中断/取消呢? - Gautham M
另外,在 doLeaf 函数内,为什么我们没有检查节点是否已经被取消。如果它已经被取消了,有必要调用 sinkSupplier.get() 吗?难道我们不能直接继续执行 cancelLaterNodes 吗? - Gautham M

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接