ExecutorService的惊人性能拐点——经验法则是什么?

24

我正在尝试弄清如何正确使用Java的Executors。我意识到向ExecutorService提交任务会有一定的开销。但是,我惊讶地发现它的开销是如此之高。

我的程序需要处理大量的数据(股市数据),并且需要尽可能低的延迟。大部分计算都是相当简单的算术运算。

我尝试测试了一个非常简单的东西:“Math.random() * Math.random()

最简单的测试在一个简单的循环中运行这个计算。第二个测试将相同的计算放在匿名Runnable内(这应该测量创建新对象的成本)。第三个测试将Runnable传递给一个ExecutorService(这将测量引入执行器的成本)。

我在我的小笔记本电脑上运行了这些测试(2个CPU,1.5 GB RAM):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

注意,执行器比在单个线程上执行要花费更多的时间。在线程池大小为1到8之间时,数字大约相同。

问题:我是否漏掉了一些明显的东西,还是这些结果是预期的?这些结果告诉我,我传递给执行器的任何任务都必须进行一些非平凡的计算。如果我正在处理数百万条消息,并且需要对每个消息执行非常简单(而便宜)的转换,则仍然可能无法使用执行器...尝试跨多个 CPU 分散计算可能会变得比在单个线程中执行计算更加昂贵。设计决策比我最初想象的要复杂得多。有什么想法吗?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}

哇,预览时代码格式化得比最终结果好多了。我该怎么修复这个问题? - Shahbaz
1
我刚刚重新格式化了它,看起来更好了吗? - ZZ Coder
谢谢 ZZ Coder,代码现在看起来应该是这样的。 - Shahbaz
是的,我还没有运行这些代码示例,但我强烈怀疑你上面的ExecutorService运行中几乎所有的时间都来自于ExecutorService的创建,甚至在那里也可能是为其工作生成新线程。 - gsteff
不,创建服务和线程是微不足道的。时间是由于在Math.random上进行锁定。 - adrianos
11个回答

21
  1. 使用执行器(executors)是为了利用CPU和/或CPU核心,因此如果您创建一个线程池来充分利用最好的CPU数量,您必须有与CPU /核心数量相同的线程。
  2. 您是对的,创建新对象成本太高。因此,减少开销的一种方法是使用批处理(batching)。如果您知道要执行的计算类型和数量,那么就可以创建批处理(batch)。所以考虑在一个执行任务中完成数千个计算。您为每个线程创建批次。一旦计算完成(java.util.concurrent.Future),则创建下一个批次。甚至可以并行地创建新批次(4个CPU -> 3个线程进行计算,1个线程进行批处理)。最终,您可能会获得更高的吞吐量,但需要更高的内存需求(批处理,提供)。

注意: 我修改了你的例子,并在我的双核x200笔记本电脑上运行它。

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

从源代码中可以看出,我将批量提供和执行器生命周期的时间从测量结果中剔除了。与其他两种方法相比,这样更加公平。

您可以自行查看结果...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}

如果我只修改了computationWithObjCreationAndExecutors,使其变为es.submit(batch).get(),那么所有三个的时间都会缩短,即:provisioned:要执行2批次 simpleComputation:96 computationWithObjCreation:102 computationWithObjCreationAndExecutors:96我有什么遗漏吗? - Lydon Ch
你说得对。当调用Executor#awaitTermination()后,'computationWithObjCreationAndExecutors'的秒表需要停止。我会在代码中进行更新。并行执行的批处理仍然更快(在多个核心上)。但是在调用Executor#awaitTermination()和Executor#shutdown()时,还有一些内部工作正在进行,所以实际速度比显示的略慢一些。虽然最终执行结果取决于您的环境。在OSX平台上,核心/ CPU使用情况(VM、VM选项、操作系统的调度程序)可能会有所不同... - cafebabe
1
有可能批处理的执行会变慢。也许操作系统在批处理执行期间会阻止一个或多个核心的执行,以便为具有更高优先级的任务腾出空间。上面的示例仅应说明并行执行和批处理的概念,用于计算较大数据集。即使是简单的计算问题也不会得到充分的基准测试结果。真正的基准测试需要更多的努力,并且需要一个干净的环境,没有干扰您系统的任务(如检查即时通讯或邮件更新、更新窗口、更新时钟等)。 - cafebabe
2
Math.random受同步的影响,因此多线程测试并不真正有效,无法作为同步执行的性能比较。 - adrianos
1
@adrianos 是对的。不要使用 Math.random()。相反,先新建一个 Random(),然后使用 nextDouble() 生成随机双精度数。这将避免同步问题。 - Guocheng
显示剩余2条评论

8
这个测试不公平,原因如下:
  1. 你只有一个线程,根本没有充分利用线程池。
  2. 任务太简单了,线程池的开销无法证明其必要性。在仅有FPP的CPU上进行乘法运算只需要几个周期。
考虑到线程池除了创建对象和运行作业之外还需要执行以下额外步骤:
  1. 将作业放入队列中
  2. 从队列中删除作业
  3. 获取线程并执行作业
  4. 将线程返回到池中
当你有一个真正的作业和多个线程时,线程池的好处就会显现出来。

1
我赞同ZZ Coder的观点;根据我的经验,当线程池更大时,这些好处将会更明显。 - Everyone
执行器不必“获取”和“返回”线程。它创建一个轮询任务队列的内部工作线程。此外,由于任务的低时间复杂度,只使用一个线程可能是有优势的,否则,在竞争阻塞队列锁时,可能会出现问题,导致将工作线程移动到可运行状态。真正的成本?去内核创建一个线程,并在等待线程终止时调用阻塞操作。100,000并不算多。但是我们应该认识到性能调优需要测试。 - Tim Bender
我尝试了1到8个线程池大小,它们都返回了大约相同的数字。我专注于1个线程池大小,因为我想测量执行器框架的开销。您的评论确实加强了我需要进一步研究该框架内部的必要性。 - Shahbaz

7
您提到的“开销”与ExecutorService无关,而是由多个线程在Math.random上同步导致的锁争用。
因此,是的,您缺少了一些东西(下面的“正确”答案实际上并不正确)。
这是一些Java 8代码,演示8个线程运行一个简单的函数,其中没有锁争用:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

进行了120个25k操作的测试,结果如下(以毫秒为单位):

  • 同步执行时间:9956
  • 同步批处理执行时间:9900
  • 每批线程执行时间:2176
  • Executor池执行时间:1922

胜者:Executor服务。


4
我认为这并不现实,因为每次调用方法时都会创建一个新的执行器服务。除非您有非常奇怪的要求,否则这似乎不现实 - 通常您会在应用程序启动时创建服务,然后将作业提交给它。
如果您再次尝试基准测试,但是在计时循环之外将服务初始化为字段,那么您将看到将Runnables提交给服务与自己运行它们的实际开销。
但我认为您还没有完全掌握重点 - Executors并不是为了效率而存在的,它们存在是为了使协调和交接工作到线程池更简单。它们始终比仅调用Runnable.run()自己要低效(因为归根结底,执行器服务仍然需要在此之前执行一些额外的清理工作)。当您从需要异步处理的多个线程中使用它们时,它们真正闪耀。
还要考虑到您正在查看基本固定成本(无论您的任务需要1ms还是1hr运行,执行器开销相同)与非常小的可变量之间的相对时间差异(您的琐碎可运行内容)。如果执行器服务需要额外花费5ms来运行1ms的任务,则这不是一个非常有利的数字。如果它需要额外花费5ms来运行5秒钟的任务(例如非琐碎的SQL查询),那么这完全可以忽略不计,并且完全值得。
因此,在某种程度上,它取决于您的情况 - 如果您有一个极其时间关键的部分,运行许多小任务,这些任务不需要并行或异步执行,则从执行器中无法获得任何东西。如果您正在并行处理较重的任务并希望以异步方式响应(例如Web应用程序),则执行器非常好。它们是否是最佳选择取决于您的情况,但实际上您需要使用具有代表性的真实数据进行测试。除非您的任务确实如此琐碎(并且您不想重复使用执行器实例...),否则我认为从您所做的测试中得出任何结论都不合适。

我在方法内初始化执行器,但不在循环内部。我只是使用方法来保持测试的独立性。我知道执行器有它们的开销,但我很惊讶它是如此之高。不幸的是(或者幸运的是),我的大多数计算确实是那么微不足道的(简单的算术运算),除了它们是在大量的消息上完成的。想象一下一个处理大量消息的消息系统,但每个消息的转换并不是过于昂贵的。从这里我得到的结论是,我需要以与最初思考的不同的粒度使我的程序并发。 - Shahbaz

4

Math.random() 实际上是在单个随机数生成器上进行同步。调用 Math.random() 会导致对该数字生成器的显著争用。实际上,您拥有的线程越多,速度就越慢。

从 Math.random() 的 javadoc 中可以看到:

该方法被正确同步,以允许多个线程正确使用。但是,如果需要让许多线程以大量的速率生成伪随机数,则每个线程都拥有自己的伪随机数生成器可能会减少每个线程的争用。


1

以下是我机器上的结果(64位Ubuntu 14.0上的OpenJDK 8,Thinkpad W530)

simpleCompuation:6
computationWithObjCreation:5
computationWithObjCreationAndExecutors:33

肯定会有额外开销。但请记住这些数字代表的是:每 100k 次迭代所需的毫秒数。在您的案例中,每次迭代的额外开销约为 4 微秒。而对于我来说,额外开销约为四分之一微秒。

这里的额外开销指的是同步、内部数据结构以及由于代码路径复杂而可能缺乏 JIT 优化(肯定比您的 for 循环更复杂)。

实际上值得并行处理的任务,即使有四分之一微秒的额外开销,也是值得的。


提供信息,这将是一个非常不适合并行计算的计算。我将线程数增加到8(核心数):

simpleCompuation:5
computationWithObjCreation:6
computationWithObjCreationAndExecutors:38

它并没有使其更快。这是因为Math.random()是同步的。

1

首先,微基准测试存在一些问题。你进行了预热,这是好的。然而,最好运行多次测试,这应该能够感受到它是否真正预热以及结果的差异性。此外,最好分别对每个算法进行测试,否则可能会在算法更改时导致去优化。

任务非常小,虽然我不确定有多小。因此,速度快多少倍是相当无意义的。在多线程情况下,它将触及相同的易失性位置,因此线程可能会导致非常糟糕的性能(每个线程使用一个Random实例)。此外,47毫秒的运行时间有点短。

显然,为了执行微小操作而转到另一个线程不会很快。如果可能的话,将任务分成更大的大小。JDK7看起来将拥有一个分支-合并框架,它试图通过优先在同一线程上按顺序执行任务,并通过空闲线程提取较大的任务来支持细分和征服算法的细小任务。


多次运行测试的观点很好。实际上我已经运行了很多次,只是粘贴了单个结果。我理解您关于改进基准测试的观点。 - Shahbaz

0
如果对其他人有用的话,这里是在三星安卓设备上使用ExecutorService重复执行直到所有任务结束的真实场景测试结果。
 Simple computation (MS): 102
 Use threads (MS): 31049
 Use ExecutorService (MS): 257

代码:

   ExecutorService executorService = Executors.newFixedThreadPool(1);
        int count = 100000;

        //Simple computation
        Instant instant = Instant.now();
        for (int i = 0; i < count; i++) {
            double x = Math.random() * Math.random();
        }
        Duration duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Simple computation (MS): " + duration.toMillis());


        //Use threads
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                double x = Math.random() * Math.random();
            }
            ).start();
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use threads (MS): " + duration.toMillis());


        //Use ExecutorService
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                        double x = Math.random() * Math.random();
                    }
            );
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use ExecutorService (MS): " + duration.toMillis());

0
Fixed ThreadPool 的最终目的是重复使用已经创建的线程。因此,性能的提升在于不需要每次提交任务时重新创建新线程。因此,停止时间必须在提交的任务内部进行处理,即在 run 方法的最后一个语句中进行处理。

0
你需要以某种方式对执行进行分组,以便将更大的计算部分提交给每个线程(例如,根据股票符号构建组)。 在类似的情况下,我使用Disruptor得到了最佳结果。它具有非常低的每个作业开销。仍然重要的是分组作业,天真的轮询通常会创建许多缓存未命中。
请参见 http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接