Java - 多线程一个大循环

4

这可能是一个非常简单的问题,但因为我以前从未使用过线程,所以最好询问而不是完全自己找到最佳解决方案。

我有一个运行次数达数十亿次的巨大for循环。在每个循环运行中,根据当前的index,程序会计算出一个数字形式的最终结果。我只想存储最高的result(或前x个结果)及其对应的索引。

我的问题很简单,如何正确地通过线程运行此循环,以使用所有可用的CPU /核心。

int topResultIndex;
double topResult = 0;

for (i=1; i < 1000000000; ++i) {
    double result = // some complicated calculation based on the current index
    if (result > topResult) {
        topResult = result;
        topResultIndex = i;
    }
}

每个索引的计算是完全独立的,没有共享资源。尽管如此,每个线程都会访问topResultIndextopResult
*更新:Giulio和rolfl的解决方案都很好,也非常相似。我只能接受其中一个作为我的答案。

3
每个指数的计算是独立的,还是会有共享资源用于计算? - Jeffrey
每个索引的计算是完全独立的。 - SportySpice
如果循环受到CPU限制,多线程将增加其速度(由Amdahl定律给出的因子)。如果瓶颈是内存,则无法起作用(因为多线程不能使RAM运行更快)。 - Giulio Franco
当我在双核CPU上运行循环时,我看到CPU使用率恰好达到50%,因此我可以假设CPU确实是瓶颈。当然,在实际运行线程后才能确定。 - SportySpice
2
建议您也查看http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html。 - vandale
3个回答

6
假设结果由一个名为calculateResult(long)的方法计算,该方法是私有静态的且不访问任何静态字段(它也可以是非静态的,但仍必须是线程安全和可同时执行的,最好是线程限制的)。
然后我认为以下代码会完成这项任务:
public static class Response {
    int index;
    double result;
}

private static class MyTask implements Callable<Response> {
    private long from;
    private long to;

    public MyTask(long fromIndexInclusive, long toIndexExclusive) {
        this.from = fromIndexInclusive;
        this.to = toIndexExclusive;
    }

    public Response call() {
        int topResultIndex;
        double topResult = 0;

        for (long i = from; i < to; ++i) {
            double result = calculateResult(i);
            if (result > topResult) {
                topResult = result;
                topResultIndex = i;
            }
        }

        Response res = new Response();
        res.index = topResultIndex;
        res.result = topResult;
        return res;
    }
};

private static calculateResult(long index) { ... }

public Response interfaceMethod() {
    //You might want to make this static/shared/global
    ExecutorService svc = Executors.newCachedThreadPool();

    int chunks = Runtime.getRuntime().availableProcessors();
    long iterations = 1000000000;
    MyTask[] tasks = new MyTask[chunks];
    for (int i = 0; i < chunks; ++i) {
        //You'd better cast to double and round here
        tasks[i] = new MyTask(iterations / chunks * i, iterations / chunks * (i + 1));
    }

    List<Future<Response>> resp = svc.invokeAll(Arrays.asList(tasks));
    Iterator<Future<Response>> respIt = resp.iterator();

    //You'll have to handle exceptions here
    Response bestResponse = respIt.next().get();

    while (respIt.hasNext()) {
        Response r = respIt.next().get();
        if (r.result > bestResponse.result) {
            bestResponse = r;
        }
    }

    return bestResponse;
}

根据我的经验,在分块时比每个索引都创建一个任务要快得多(尤其是当每个单独的索引的计算负载很小,比如说少于半秒钟)。不过编码会稍微有些困难,因为你需要进行两步最大化操作(首先针对分块级别,然后再进行全局级别的操作)。通过这种方式,如果计算仅基于 CPU 运行(不会过度使用 RAM),则应该可以获得接近于物理核心数80%的速度提升。


如果 calculateResult() 包含涉及等待一两秒钟的网络请求,您会怎么处理呢?您会为每个索引分配一个任务吗? - splinter123
仍然取决于您需要进行多少个请求。一般来说,我会在任务和索引之间建立1:1的关系,但您不希望用等待线程淹没您的操作系统。对于普通的台式机/笔记本电脑,对于网络或硬盘驱动器请求,我会避免从单个调用中生成超过20-30个线程。还要考虑到您需要等待的设备有其自己的限制,将太多的请求放在上面是没有帮助的。无论如何,当任务受IO限制时,任务的数量可以比CPU的数量大得多。 - Giulio Franco

2
除了观察到使用OpenMP或其他并行计算扩展的C程序可能是更好的选择之外,用Java实现的方式是创建一个“Future”任务来计算问题的子集:
private static final class Result {
   final int index;
   final double result;
   public Result (int index, double result) {
       this.result = result;
       this.index = index;
   }
}

// Calculate 10,000 values in each thead
int steps = 10000;
int cpucount = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(cpucount);
ArrayList<Future<Result>> results = new ArrayList<>();
for (int i = 0; i < 1000000000; i+= steps) {
    final int from = i;
    final int to = from + steps;
    results.add(service.submit(new Callable<Result>() {
        public Result call() {
              int topResultIndex = -1;
              double topResult = 0;
              for (int j = from; j < to; j++) {
                  // do complicated things with 'j'
                      double result = // some complicated calculation based on the current index
                      if (result > topResult) {
                          topResult = result;
                          topResultIndex = j;
                      }
              }
              return new Result(topResultIndex, topResult);
        }
    });
 }

 service.shutdown();
 while (!service.isTerminated()) {
     System.out.println("Waiting for threads to complete");
     service.awaitTermination(10, TimeUnit.SECONDS);
 }
 Result best = null;
 for (Future<Result> fut : results) {
    if (best == null || fut.result > best.result) {
       best = fut;
    }
 }

 System.out.printf("Best result is %f at index %d\n", best.result, best.index);

Future<Result>

非常感谢您提供深入的解决方案。您认为用C/C++编写会更好吗?我的整个程序已经是用Java编写的,但如果确实能够显著改善,我想考虑用C++编写程序的这一部分。 - SportySpice
实际上,调用 Runtime.getRuntime().availableProcessors() 将返回处理器数量,而无需使用任何外部库。 - vandale
编辑过了,谢谢vandale。@SportySpice - 这取决于情况。在你特定的情况下,Java JIT可能能够竞争得很激烈。 - rolfl

1

最简单的方法是使用ExecutorService并将您的任务提交为RunnableCallable。您可以使用Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())创建一个ExeuctorService,它将使用与处理器数量相同的线程。


好的,但我的“任务”是什么?我应该只是分割循环吗?例如,如果我在2个处理器上,一个将从0到500000000运行,另一个将从500000001到1000000000运行?还是有更合适的解决方案?或者你是指每个循环运行本身将是一个新的可运行对象吗?(创建这么多对象听起来不太明智) - SportySpice
你的任务是进行长时间复杂计算。创建一个名为 getResult(int index) 的方法,并将其作为你的任务。 - Jeffrey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接