连接并行流

14
假设我有两个int[]数组input1input2。我想从第一个数组中仅取正数,从第二个数组中取不同的数字,将它们合并,排序并存储到结果数组中。这可以使用流来执行:
int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                   Arrays.stream(input2).distinct()).sorted().toArray();

我想加速任务,所以考虑将流并行化。通常,这意味着我可以在流构造和终端操作之间插入.parallel(),结果将是相同的。IntStream.concat 的JavaDoc表明,如果输入流中任何一个流是并行的,则生成的流也将是并行的。因此,我认为将 parallel() 应用于输入1流、输入2流或连接后的流将产生相同的结果。
实际上,我错了:如果我在生成的流中添加 .parallel(),则输入流似乎仍然是顺序的。而且,我可以标记输入流(其中一个或两个),然后将生成的流变为 .sequential(),但输入仍然是并行的。因此,实际上有8种可能性:输入1、输入2和连接后的流中的任何一个都可以是并行或顺序的。
int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();

我为不同的输入大小(在Core i5 4xCPU,Win7上使用JDK 8u45 64位)进行了基准测试,并且对于每种情况都得到了不同的结果:

Benchmark           (n)  Mode  Cnt       Score       Error  Units
ConcatTest.SSS      100  avgt   20       7.094 ±     0.069  us/op
ConcatTest.SSS    10000  avgt   20    1542.820 ±    22.194  us/op
ConcatTest.SSS  1000000  avgt   20  350173.723 ±  7140.406  us/op
ConcatTest.SSP      100  avgt   20       6.176 ±     0.043  us/op
ConcatTest.SSP    10000  avgt   20     907.855 ±     8.448  us/op
ConcatTest.SSP  1000000  avgt   20  264193.679 ±  6744.169  us/op
ConcatTest.SPS      100  avgt   20      16.548 ±     0.175  us/op
ConcatTest.SPS    10000  avgt   20    1831.569 ±    13.582  us/op
ConcatTest.SPS  1000000  avgt   20  500736.204 ± 37932.197  us/op
ConcatTest.SPP      100  avgt   20      23.871 ±     0.285  us/op
ConcatTest.SPP    10000  avgt   20    1141.273 ±     9.310  us/op
ConcatTest.SPP  1000000  avgt   20  400582.847 ± 27330.492  us/op
ConcatTest.PSS      100  avgt   20       7.162 ±     0.241  us/op
ConcatTest.PSS    10000  avgt   20    1593.332 ±     7.961  us/op
ConcatTest.PSS  1000000  avgt   20  383920.286 ±  6650.890  us/op
ConcatTest.PSP      100  avgt   20       9.877 ±     0.382  us/op
ConcatTest.PSP    10000  avgt   20     883.639 ±    13.596  us/op
ConcatTest.PSP  1000000  avgt   20  257921.422 ±  7649.434  us/op
ConcatTest.PPS      100  avgt   20      16.412 ±     0.129  us/op
ConcatTest.PPS    10000  avgt   20    1816.782 ±    10.875  us/op
ConcatTest.PPS  1000000  avgt   20  476311.713 ± 19154.558  us/op
ConcatTest.PPP      100  avgt   20      23.078 ±     0.622  us/op
ConcatTest.PPP    10000  avgt   20    1128.889 ±     7.964  us/op
ConcatTest.PPP  1000000  avgt   20  393699.222 ± 56397.445  us/op

从这些结果中,我只能得出这样的结论:并行化distinct()步骤会降低整体性能(至少在我的测试中是这样)。
因此,我有以下问题:
  1. 是否有任何官方指南,可以更好地使用串联流的并行化?不总是可行测试所有可能的组合(特别是当连接超过两个流时),因此有一些“经验法则”将是不错的。
  2. 似乎如果我连接直接从集合/数组创建的流(在连接之前没有执行中间操作),那么结果不会太依赖于parallel()的位置。这是真的吗?
  3. 除了串联之外,还有哪些情况取决于流管道何时并行化的结果?
1个回答

8
规范精确地描述了您所获得的内容——当您考虑到,与其他操作不同,我们不是在谈论单个管道,而是三个独立保留其属性的“Stream”。规范说:“如果任何一个输入流是并行的,则生成的流也是并行的。”这就是您所获得的;如果任一“input”流是并行的,则“resulting”流也是并行的(但您可以在之后将其转换为顺序)。但是,将“resulting”流更改为并行或顺序不会更改“input”流的性质,将并行和顺序流馈入“concat”也是如此。
关于性能后果,请参阅文档,“Stream operations and pipelines”段落
中间操作进一步分为无状态和有状态操作。无状态操作,例如filtermap,在处理新元素时不保留先前已看到的元素的状态--每个元素可以独立于其他元素的操作进行处理。有状态操作,例如distinctsorted,在处理新元素时可能会合并来自先前已看到元素的状态。
有状态操作可能需要在生成结果之前处理整个输入。例如,不能从对流进行排序中产生任何结果,直到看到流的所有元素。因此,在并行计算下,某些包含有状态中间操作的流水线可能需要多次通过数据或需要缓冲大量数据。只包含无状态中间操作的管道可以在单次传递(无论是顺序还是并行)中处理,且最小化数据缓冲。
您选择了两个名为stateful的操作并将它们组合在一起。因此,所得到的流的.sorted()操作在开始排序之前需要缓冲整个内容,这意味着完成distinct操作。显然,很难并行化不同操作,因为线程必须就已经看到的值进行同步。
回答你的第一个问题,这与concat无关,而是因为distinct无法从并行执行中受益。
这也使得你的第二个问题过时了,因为在两个连接的流中执行完全不同的操作,因此您不能使用预连接的集合/数组进行相同的操作。将数组连接并在结果数组上运行distinct不太可能产生更好的结果。
关于您的第三个问题,flatMapparallel流方面的行为可能会带来一些意外...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接