理解Java 8和Java 9中的顺序流与并行流分割器

5

一道关于Spliterator的问题,乍一看并不直接。

在流(Stream)中,.parallel()会改变流处理的行为。然而,我本来以为从Sequential Stream和Parallel Stream创建出来的Spliterator是一样的。例如,在Sequential Stream中,通常不会调用.trySplit(),而在Parallel Stream中,则会为了将切割后的Spliterator交给另一个线程。

stream.spliterator()stream.parallel().spliterator()之间的区别:

  1. They may have different characteristics:

    Stream.of(1L, 2L, 3L).limit(2);            // ORDERED
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
    

看起来又有一个无意义的流分割迭代器特征策略(并行计算似乎更好)在这里讨论:深入理解Java 8和Java 9中的Spliterator特征

  1. They may have different behaviour in terms of splitting using .trySplit():

    Stream.of(1L, 2L, 3L);                     // NON NULL
    Stream.of(1L, 2L, 3L).limit(2);            // NULL
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
    

为什么最后两个行为不同?如果我想拆分一个顺序流,为什么我不能这样做?(例如,快速处理时丢弃其中一个拆分可能有用)。

  1. Big impacts when transforming a spliterators to a stream:

    spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator();
    stream = StreamSupport.stream(spliterator, true); // No parallel processing!
    
在这种情况下,从顺序流创建了一个分裂器,其禁用了分割的能力(.trySplit()返回null)。当稍后需要转换回流时,该流将不会受益于并行处理。真遗憾。
重要问题是:作为解决方法,总是在调用.spliterator()之前将流转换为并行流的主要影响是什么?
// Supports activation of parallel processing later
public static <T> Stream<T> myOperation(Stream<T> stream) {
    boolean isParallel = stream.isParallel();
    Spliterator<T> spliterator = stream.parallel().spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        // My implementation of the interface here (omitted for clarity)
    }, isParallel).onClose(stream::close);
}

// Now I have the option to use parallel processing when needed:
myOperation(stream).skip(1).parallel()...

你为什么期望从顺序流和并行流创建的分割器是相同的?你的问题最终不会转化为并行流与顺序流的比较吗? - Naman
单一职责原则。Spliterator 只是一个 Spliterator,他只知道如何拆分迭代器。流应该具有创建线程的逻辑,而不是 Spliterator。(这是期望的) - Tet
2
也许你还记得,这种特定于实现的行为已经在 这个答案的评论中 讨论过了。在那里,Tagir Valeev 发表了有趣的评论:“另一方面,你不能盲目地使用 parallel(),因为它可能会意外地并行执行某些操作(比如排序),从而消耗更多的 CPU 核心”,这部分回答了你的问题(总是将流转换为并行产生的主要影响)... - Holger
感谢@Holger。我如何更好地理解影响?这些操作是在什么时候完成的?只有在拆分期间吗?(如果是,可能不太有问题,因为顺序流从不调用.trySplit())。为什么需要执行这些操作?有没有任何地方可以获取该信息?(顺便说一句:这就是我正在寻找的答案) - Tet
1个回答

7
这不是spliterator的通用属性,而只适用于封装流水线的包装spliterator。
当您在调用从spliterator生成且没有链接操作的流的spliterator()时,无论流的parallel状态如何,您都将获得源spliterator,该spliterator可能支持或不支持trySplit
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "foo", "bar", "baz");
Spliterator<String> sp1 = list.spliterator(), sp2=list.stream().spliterator();
// true
System.out.println(sp1.getClass()==sp2.getClass());
// not null
System.out.println(sp2.trySplit());

同样地

Spliterator<String> sp = Stream.of("foo", "bar", "baz").spliterator();
// not null
System.out.println(sp.trySplit());

但是,只要在调用spliterator()之前链接操作,您将获得一个包装流管道的分裂器。现在,可以实现执行相关操作的专用分裂器,例如LimitSpliteratorMappingSpliterator,但这尚未完成,因为将流转换回分裂器被认为是最后的手段,当其他终端操作不适合时才使用,而不是高优先级的用例。相反,您将始终获得单个实现类的实例,该实现类尝试将流管道实现的内部工作转换为分裂器API。
对于有状态操作(特别是sorteddistinct或非SIZED流的skiplimit),这可能会很复杂。对于简单的无状态操作,例如mapfilter,提供支持将更加容易,正如代码注释中所述

Abstract wrapping spliterator that binds to the spliterator of a pipeline helper on first operation. This spliterator is not late-binding and will bind to the source spliterator when first operated on. A wrapping spliterator produced from a sequential stream cannot be split if there are stateful operations present.

…

   // @@@ Detect if stateful operations are present or not
   //     If not then can split otherwise cannot

   /**
    * True if this spliterator supports splitting
    */
   final boolean isParallel;
但目前似乎还没有实现这种检测,所有中间操作都被视为有状态的操作。
Spliterator<String> sp = Stream.of("foo", "bar", "baz").map(x -> x).spliterator();
// null
System.out.println(sp.trySplit());

当您试图通过始终调用parallel来解决此问题时,仅在流水线由无状态操作组成时才不会产生影响。但是,在存在有状态操作时,它可能会显着改变行为。例如,当您有一个sorted步骤时,在您可以消耗第一个元素之前,必须缓冲和排序所有元素。对于并行流,即使您从未调用trySplit,它也很可能使用parallelSort

感谢@Holger。即使使用了.sorted(),如果我不调用.parallel(),它也不应该有影响,对吗?例如:StreamSupport.stream(Stream.of("foo", "bar", "baz").map(x -> x).parallel().spliterator(), false).sorted().limit(2).skip(1).findFirst() 如果结果是顺序流,调用.spliterator()之前调用.parallel()会有什么影响? - Tet
2
你误解了。当在spliterator()之前插入parallel(),例如Arrays.stream(array).map(function).sorted(comparator).parallel().spliterator().tryAdvance()时,有状态的操作可能会并行运行,处理整个数组并要求functioncomparator是线程安全的。这将产生重大影响。 - Holger
感谢@Holger。我进行了一些单元测试,即使只是使用.tryAdvanced().forEachRemaining()而没有使用.trySplit(),当涉及到.distict().sorted()操作时也会创建线程。 - Tet

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接