ExecutorService.submit(<callable>)需要更多时间吗?

7

我正在尝试了解java.util.concurrent包中的实用工具。学到我们可以向ExecutorService提交callable对象,它将返回一个Future,该对象填充着在call()方法成功完成任务后由callable返回的值。

我理解所有的callable都使用多个线程并发执行。

当我想要查看ExecutorService相对于批处理任务执行带来的改进时,我想记录时间。

以下是我尝试执行的代码 -

package concurrency;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class ExecutorExample {

    private static Callable<String> callable = new Callable<String>() {

        @Override
        public String call() throws Exception {
            StringBuilder builder = new StringBuilder();
            for(int i=0; i<5; i++) {
                builder.append(i);
            }
            return builder.toString();
        }
    };

    public static void main(String [] args) {
        long start = System.currentTimeMillis();
        ExecutorService service = Executors.newFixedThreadPool(5);
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for(int i=0; i<5; i++) {
            Future<String> value = service.submit(callable);
            futures.add(value);
        }
        for(Future<String> f : futures) {
            try {
                System.out.println(f.isDone() + " " + f.get());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        long end  = System.currentTimeMillis();
        System.out.println("Executer callable time - " + (end - start));
        service.shutdown();

        start = System.currentTimeMillis();
        for(int i=0; i<5; i++) {
            StringBuilder builder = new StringBuilder();
            for(int j=0; j<5; j++) {
                builder.append(j);
            }
            System.out.println(builder.toString());
        }
        end = System.currentTimeMillis();
        System.out.println("Normal time - " + (end - start));
    }

}

这是它的输出结果 -
true 01234
true 01234
true 01234
true 01234
true 01234
Executer callable time - 5
01234
01234
01234
01234
01234
Normal time - 0

请告诉我是否有遗漏或对某些内容的理解有误。
非常感谢您抽出时间来帮助这个讨论。

5
你做错的是这个“基准测试”太小了,无法充分利用多线程处理工作所带来的好处。尝试一些需要时间的任务,比如在每个线程上计算前10000个素数之类的东西... 同时,你也没有进行任何“热身”迭代:JVM 在首次执行特定代码块时可能需要执行一些操作,因此你应该进行几次不计入最终结果的迭代... - ppeterka
@ppeterka,感谢您的评论。然而,我尝试了“Runnable”而不是“Callable”(ExecutorService.submit(<runnable>)使用相同的任务。我看到了整个任务时间消耗的改善。Runnable只有0毫秒的差异。因此,我想问这个问题。 - sanbhat
在同一JVM执行中,“Runnable”测试是否在“Callable”测试之后执行?如果在循环中进行10000次迭代,数字如何比较?在这个范围内测量时间最好是有问题的... - ppeterka
是的,主方法具有可调用、可运行和普通执行。 - sanbhat
请阅读我的第一条评论的后半部分:你必须有一些不被任何方式测量的预热迭代。JVM需要进行一些初始化,这仅在特定代码部分第一次运行时需要花费大量时间。复制main方法的内容两次以了解我的意思。 - ppeterka
3个回答

4
如果您在Callable中的任务太小,由于任务切换和初始化开销,您将无法从并发中获得好处。尝试在callable中添加更多的重量级循环,比如1000000次迭代,您会看到差异。

2
当你运行任何代码,尤其是第一次运行时,会花费时间。如果你将任务传递给另一个线程,可能需要1-10微秒的时间,如果你的任务所需时间少于此,则开销可能大于收益。也就是说,如果开销足够高,则使用多个线程可能比使用单个线程要慢得多。
我建议你:
- 将任务的成本增加到1000次迭代。 - 确保在单线程示例中不丢弃结果。 - 运行两个测试至少几秒钟,以确保代码已经预热。

1

这不是一个答案(但我不确定代码是否适合评论)。为了进一步扩展彼得所说的,通常有一个甜点来平衡作业大小(以执行时间衡量)与工作人员间公平的工作分配,以此来减少池/队列开销。代码示例有助于找到这个甜点的估计值。在目标硬件上运行。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FibonacciFork extends RecursiveTask<Long> {

private static final long serialVersionUID = 1L;

public FibonacciFork( long n) {
    super();
    this.n = n;
}

static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors());

static long fibonacci0( long n) {
    if ( n < 2) {
        return n;
    }
    return fibonacci0( n - 1) + fibonacci0( n - 2);
}

static int  rekLimit = 8;

private static long stealCount;

long    n;

private long forkCount;

private static AtomicLong forks = new AtomicLong( 0);

public static void main( String[] args) {

    int n = 45;
    long    times[] = getSingleThreadNanos( n);
    System.out.println( "Single Thread Times complete");
    for ( int r = 2;  r <= n;  r++) {
        runWithRecursionLimit( r, n, times[ r]);
    }
}

private static long[] getSingleThreadNanos( int n) {
    final long times[] = new long[ n + 1];
    ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 2));
    for ( int i = 2;  i <= n;  i++) {
        final int arg = i;
        Runnable runner = new Runnable() {
            @Override
            public void run() {
                long    start = System.nanoTime();
                final int minRuntime = 1000000000;
                long    runUntil = start + minRuntime;
                long    result = fibonacci0( arg);
                long    end = System.nanoTime();
                int         ntimes = Math.max( 1, ( int) ( minRuntime / ( end - start)));
                if ( ntimes > 1) {
                    start = System.nanoTime();
                    for ( int i = 0;  i < ntimes;  i++) {
                        result = fibonacci0( arg);
                    }
                    end = System.nanoTime();
                }
                times[ arg] = ( end - start) / ntimes;
            }
        };
        es.execute( runner);
    }
    es.shutdown();
    try {
        es.awaitTermination( 1, TimeUnit.HOURS);
    } catch ( InterruptedException e) {
        System.out.println( "Single Timeout");
    }
    return times;
}

private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos) {
    rekLimit = r;
    long    start = System.currentTimeMillis();
    long    result = fibonacci( arg);
    long    end = System.currentTimeMillis();
    // Steals zählen
    long    currentSteals = fjp.getStealCount();
    long    newSteals = currentSteals - stealCount;
    stealCount = currentSteals;
    long    forksCount = forks.getAndSet( 0);
    System.out.println( "Fib(" + arg + ")=" + result + " in " + ( end-start) + "ms, recursion limit: " + r +
            " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount);
}

static long fibonacci( final long arg) {
    FibonacciFork   task = new FibonacciFork( arg);
    long result = fjp.invoke( task);
    forks.set( task.forkCount);
    return result;
}

@Override
protected Long compute() {
    if ( n <= rekLimit) {
        return fibonacci0( n);
    }
    FibonacciFork   ff1 = new FibonacciFork( n-1);
    FibonacciFork   ff2 = new FibonacciFork( n-2);
    ff1.fork();
    long    r2 = ff2.compute();
    long    r1 = ff1.join();
    forkCount = ff2.forkCount + ff1.forkCount + 1;
    return r1 + r2;
}
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接