Java Spliterator持续拆分并行流

6

我在Java并行流中发现了一些令人惊讶的行为。我自己创建了一个Spliterator,结果并行流被划分到每个流只包含一个元素。这似乎太小了,我想知道我做错了什么。我希望有一些特性可以设置来纠正这一点。

下面是我的测试代码。这里的Float仅是一个虚拟载荷,我的真实流类要复杂得多。

   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 10 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total: " + total );
   }

这段代码将不断分割流,直到每个Spliterator只有一个元素。这似乎过于繁琐,不够高效。

输出:

run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)

这是Spliterator的代码。我的主要关注点是应该使用哪些特性,但也许还有其他问题吗?
public class TestingSpliterator implements Spliterator<Float> {
   int count;
   int splits;

   public TestingSpliterator( int count ) {
      this.count = count;
   }

   @Override
   public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
      if( count > 0 ) {
         cnsmr.accept( (float)Math.random() );
         count--;
         return true;
      } else
         return false;
   }

   @Override
   public Spliterator<Float> trySplit() {
      System.err.println( "Split on count: " + count );
      if( count > 1 ) {
         splits++;
         int half = count / 2;
         TestingSpliterator newSplit = new TestingSpliterator( count - half );
         count = half;
         return newSplit;
      } else
         return null;
   }

   @Override
   public long estimateSize() {
      return count;
   }

   @Override
   public int characteristics() {
      return IMMUTABLE | SIZED;
   }
}

那么如何将流分割成更大的块呢?我希望每个块的大小在10,000到50,000左右会更好。我知道可以从trySplit()方法中返回null,但那似乎是一种错误的做法。系统应该根据核心数、当前负载以及使用流的代码的复杂度等因素自行调整,从而外部配置流块大小,而不是由流本身内部固定。编辑:关于下面Holger的答案,当我增加原始流中的元素数量时,流分片会相对减少,所以StreamSupport最终会停止分片。在初始流大小为100个元素时,当达到2个流大小时(我屏幕上看到的最后一行是Split on count: 4),StreamSupport会停止分割。对于初始流大小为1000个元素的情况,每个单独的流块的最终大小约为32个元素。编辑第二部分:在查看以上输出之后,我更改了我的代码以列出创建的各个Spliterator。以下是更改内容:
   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 100 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total Spliterators: " + testers.size() );
      for( TestingSpliterator t : testers ) {
         System.out.println( "Splits: " + t.splits );
      }
   } 

并且对于 TestingSpliterator 的构造函数:

   static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();

   public TestingSpliterator( int count ) {
      this.count = count;
      testers.add( this ); // OUCH! 'this' escape
   }

这段代码的结果是第一个Spliterator被分成5个部分。下一个Spliterator被分成4个部分。接下来的一组Spliterators被分成3个部分。以此类推。结果是创建了36个Spliterators,流被分成相同数量的部分。在典型的桌面系统上,这似乎是API认为最适合并行操作的方式。 我将接受下面Holger的答案,基本上是StreamSupport类正在做正确的事情,不要担心,开心点。对于我来说问题的一部分是我在非常小的流大小上进行早期测试,我对拆分的次数感到惊讶。不要犯同样的错误。
2个回答

3
你从错误的角度来看待这个问题。实现并没有将“直到每个分裂器只剩一个元素”拆分,而是将其拆分为“直到有十个分裂器”。
单个分裂器实例只能由一个线程处理。一旦开始遍历分裂器,它不需要支持拆分。因此,任何未使用之前的拆分机会可能会导致之后的有限并行处理能力。
请记住,Stream 实现使用了一个未知工作负载的 ToDoubleFunction¹。在你的情况下,它并不知道这个函数像 Float::doubleValue 一样简单。它可能需要一分钟才能评估,然后每个 CPU 核心都有一个分裂器就是正确的决策。即使有超过 CPU 核心数量的分裂器也是处理某些评估明显比其他评估花费更长时间的可能性的有效策略。
典型的初始分裂器数量将是“CPU 核心数 × 4”,但是当有关实际工作负载的更多信息存在时,可能会有更多的拆分操作。当您的输入数据少于该数字时,将其拆分为每个分裂器仅剩一个元素是很常见的。
你可以尝试使用“new TestingSpliterator(10000)”、“1000”或“100”来查看拆分数量不会发生显著变化,一旦实现假定有足够的块来保持所有 CPU 核心忙碌。
由于你的分裂器也不知道消耗流每个元素的工作负载,所以你不必担心这个问题。如果你可以平稳地支持拆分到单个元素,请这样做。
¹ 它没有针对没有链接任何操作的情况进行特殊优化。

当元素数量达到100个时,StreamSupport将会分割成长度为2的流(我看到的最后一次分割是在“Split on count: 4”)。对于总长度为1000个元素的情况,分割将会降至大约32个元素。我将更新我的问题。 - markspace
我仍然有些惊讶它能够这样工作,虽然在许多情况下这种分割方式似乎是有害的。我仍在努力理解这可能作为默认行为的想法。 - markspace
2
此答案所述:“如果您请求并行,则会得到并行,即使它实际上降低了性能。” 如该答案所示,当工作量足够大时,甚至可以同时处理两个元素。请注意,深度分割并不意味着它在不同的线程上运行;如果实际工作负载较低,则本地处理线程可能会在另一个线程拾取之前取下下一个块。然后,您只需要创建另一个轻量级对象的小开销。没有“并行将有益”的魔法阈值。 - Holger
好的,我相信StreamSupport类没有问题,它的行为是合理的。感谢您指引我正确的方向。 - markspace

2

除非我漏掉了什么,你可以在构造函数中传递一个bufferSize,并将其用于trySplit

@Override
public Spliterator<Float> trySplit() {

     if( count > 1 ) {
        splits++;
        if(count > bufferSize) {
            count = count - bufferSize;
            return new TestingSpliterator( bufferSize, bufferSize);
        }

    }
    return null;
}

并且随着这个:

TestingSpliterator splits = new TestingSpliterator(12, 5);
Stream<Float> test = StreamSupport.stream(splits, true);

test.map(x -> new AbstractMap.SimpleEntry<>(
                   x.doubleValue(), 
                   Thread.currentThread().getName()))
    .collect(Collectors.groupingBy(
                Map.Entry::getValue, 
                Collectors.mapping(
                     Map.Entry::getKey, 
                     Collectors.toList())))
    .forEach((x, y) -> System.out.println("Thread : " + x + " processed : " + y));

您会看到有3个线程。其中两个线程处理5个元素,一个线程处理2个元素。


嗯,我想显而易见的是,我希望API提供了一些可以为我完成这个任务的功能。我有点惊讶于我必须自己管理这个问题,API没有提供一个合理的默认分割方式。我会再仔细研究一下,但非常感谢您的建议。 - markspace
1
当然,如果分裂器不知道实际的流操作,就决定了bufferSize的有效性,那么这将是一种倒退。有人决定将十个元素的流转换为并行流(向StreamSupport.stream(splits, true)传递true),而当知道应该做什么工作的人决定进行并行处理时,Stream API实现(也不是分裂器)的任务并不是有效地关闭并行处理。 - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接