Stream.reduce()和Stream.collect()之间令人惊讶的性能差异

3
我将比较两个Java8流终端操作reduce()collect()的并行性能。以下是一个Java8并行流示例:
import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;

public class StartMe {

    static Function<Long, BigInteger> fac;

    static {
        fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
    }

    static long N = 2000;

    static Supplier<BigInteger[]> one() {
        BigInteger[] result = new BigInteger[1];
        result[0] = ONE;
        return () -> result;
    }

    static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
        return (BigInteger[] ba, BigInteger b) -> {
            synchronized (fac) {
                ba[0] = ba[0].multiply(b);
            }
        };
    }

    static BiConsumer<BigInteger[], BigInteger[]> combiner() {
        return (BigInteger[] b1, BigInteger[] b2) -> {};
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
        long t1 = System.currentTimeMillis();

        BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
        long t2 = System.currentTimeMillis();

        BigInteger result3 = fac.apply(N);
        long t3 = System.currentTimeMillis();

        System.out.println("reduce():  deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
        System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
        System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);

    }
}

它使用一些算法(尽管可能有点奇怪)来计算n!。

然而,性能结果令人惊讶:

 reduce():  deltaT = 44ms, result 1 = 3316275...
 collect(): deltaT = 22ms, result 2 = 3316275...
 recursive: deltaT = 11ms, result 3 = 3316275...

一些备注:

  • 我不得不同步 accumulator(),因为它并行访问同一数组。
  • 我原本以为 reduce()collect() 的性能应该是一样的,但事实上 reduce()collect() 慢了约两倍,尽管 collect() 必须被同步!
  • 最快的算法是顺序和递归的算法(这可能会显示出并行流管理的巨大开销)

我没想到 reduce() 的性能会比 collect() 更差。为什么会这样?


7
如何在Java中编写正确的微基准测试? - resueman
1
@resueman 我建议你看一下JMH框架。它有一个很好的教程。http://openjdk.java.net/projects/code-tools/jmh/ - Stuart Marks
1个回答

8

基本上,你正在测量第一次执行的代码的初始开销。此时优化器尚未进行任何工作,你正在测量加载、验证和初始化类的开销。

因此,随着每次评估都可以重用已为前一个评估加载的类,所以评估时间会减少。在循环中运行所有三个评估,或者仅更改顺序,将给您带来完全不同的结果。

唯一可预测的结果是,简单的递归评估将具有初始开销最小值,因为它不需要加载Stream API类。


如果您多次运行代码,或者更好地使用复杂的基准测试工具,我猜您会得到与我的类似的结果,即reduce明显优于collect,并且确实比单线程方法更快。

collect较慢的原因是因为您完全使用错误的方法。每个线程都会查询Supplier来获取不同的容器,因此累加器函数不需要任何额外的同步。但是,组合函数完成正确的工作以将不同线程的结果容器合并为单个结果非常重要。

正确的方法是:

BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
  .collect(()->new BigInteger[]{ONE},
           (a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));

在我的系统上,使用reduce方法的性能与该方法相当。由于使用数组作为可变容器无法改变BigInteger的不可变特性,因此在这里使用collect没有任何优势,在正确使用时,使用reduce是直接的且具有等效的性能。
另外,我不明白为什么会有那么多程序员尝试创建自引用的Lambda表达式。递归函数的直观方法仍然是使用一个方法。
static BigInteger fac(long x) {
    return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;

(虽然在您的代码中,您根本不需要 Function<Long, BigInteger>,只需直接调用 fac(long) 即可)。


最后需要说明的是,Stream.iterateStream.limit都非常不适合并行执行。使用具有可预测大小和独立操作的流将显着提高您的解决方案性能:

BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
    .mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接