Rxjava2、Java 8 Streams和传统的迭代方式之间的性能比较

5

我在Java 8和Rx Java中成为了函数式编程的忠实支持者。但最近一位同事指出,在使用这些技术时会有性能损失。因此,我决定运行JMH基准测试,结果发现他是正确的。无论我怎么做,都无法让流版本的性能得到提升。以下是我的代码:

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream().parallel()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Flowable.fromIterable(sourceList)
                       .parallel()
                       .runOn(Schedulers.computation())
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .sequential()
                       .blockingFirst();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

以上代码的结果:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt     Score     Error  Units
StreamVsVanilla.rxjava2  avgt   20  1179.733 ± 322.421  ns/op
StreamVsVanilla.stream   avgt   20    10.556 ±   1.195  ns/op
StreamVsVanilla.vanilla  avgt   20     8.220 ±   0.705  ns/op

即使我删除并使用以下顺序版本的并行操作符:
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

结果并不是很理想:
# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  12.226 ± 0.603  ns/op
StreamVsVanilla.stream   avgt   20  13.432 ± 0.858  ns/op
StreamVsVanilla.vanilla  avgt   20   7.678 ± 0.350  ns/op

有人能帮我找出我的错误吗?

编辑:

akarnokd指出我在顺序版本中使用了额外的阶段来解包和封装流版本(我添加了它以避免过滤和映射方法中的隐式装箱拆箱),但速度变慢了,所以我尝试了下面的代码而没有使用那些额外的阶段。

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .filter(i -> i % 2 == 0)
                .map(Math::sqrt)
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

结果仍然大致相同:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  10.864 ± 0.555  ns/op
StreamVsVanilla.stream   avgt   20  10.466 ± 0.050  ns/op
StreamVsVanilla.vanilla  avgt   20   7.513 ± 0.136  ns/op

还有什么不清楚的吗?尝试使用Annimon的Stream backport,我发现它比Java 8 Stream的开销少。 - akarnokd
2
流(即使是顺序的)已知比常规循环要慢得多。如果您需要性能,请不要使用它们。然而,它们增加了(当使用得当时)易于维护、代码可读性等,这对于大多数用例来说比性能更重要。 - fps
1个回答

4

针对并行版本

启动和分发值到多个线程相对较昂贵。为了抵消这一点,与基础架构开销相比,并行计算通常要昂贵几倍。然而,在RxJava中,Math::sqrt是如此微不足道,以至于并行开销支配了性能。

那么为什么Stream快了两个数量级?我只能假设线程窃取在其中起作用,其中基准线程执行大部分实际工作,也许一个后台线程执行其余的一些小任务,因为当后台线程启动时,主线程已经夺回了大部分任务。因此,在这里,您没有像RxJava的并行执行那样严格的并行执行,其中运算符以轮询方式分派工作,以便所有并行轨道都可以变得大致相等。

针对顺序版本

我认为你在Stream版本中有额外的拆箱和装箱阶段,这增加了一些开销。试着不用它:

   return  sourceList.stream()
            .filter(i -> i % 2 == 0)
            .map(Math::sqrt)
            .collect(Collectors.toList());

你对并行版本的解释听起来很合理,事实上我也在考虑同样的问题。但是我尝试使用并行只是因为我想证明使用流的合理性。我的主要关注点是,除了语法上的好处之外,使用流或Rx Java是否比普通迭代有任何优势?结果表明,在性能方面,它比较差。此外,消除装箱拆箱确实会稍微提高一些性能,但仍然比迭代版本慢两倍。我添加了装箱拆箱以避免在过滤器和映射中出现隐式装箱拆箱(我认为这会提高性能)。 - Shariq
2
RxJava旨在与同步和异步模式一起工作,因为阶段可以是其中之一,然后混合在一起。我花了很多时间优化同步行为,以使同步开销尽可能接近Stream。然而,如果您拥有Java 8 Streams(在大多数Android上不可用),或者只需使用传统的for循环,通常可以避免纯同步方式使用RxJava。 - akarnokd

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接