同时求和的最佳方法

6
我正在尝试计算一些大数字,为了加快计算速度,我想利用多线程。每个线程应该计算一个数字,最后计算总和。
我曾经看到过使用 SumThreadCollector 的解决方案,如下所示:
public BigInteger compute(int p) {
    Collector c = new Collector(p);

    for(T element : Collection<T> bigCollection) {
        new SumThread(c) {

            @Override
            protected void doTheJob() {
                long big = someVeryComplexCalculation(element, ...); //n!
                receive(BigInteger.valueOf(big));
            }

        }
    }

    if(collector.isReady())
        return collector.getResult();

    return null;
}

public class Collector {

    private int numberOfProcesses;
    private int numberOfAllowedProcesses;
    private BigInteger result;

    public Collector(int n) {
        numberOfAllowedProcesses = n;
        numberOfProcesses = 0;
        result = BigInteger.ZERO;
    }

    synchronized public void enter() throws InterruptedException {
        if (numberOfProcesses == numberOfAllowedProcesses) wait();
        numberOfProcesses++;
    }

    synchronized public void leave() {
        numberOfProcesses--;
        notify();
    }

    synchronized public void register(BigInteger v) {
        result = result.add(v);
    }

    synchronized public boolean isReady() throws InterruptedException {
        while (numberOfProcesses > 0) wait();
        return true;
    }

    ...
}

public abstract class SumThread extends Thread {

    private Collector collector;

    public SumThread(Collector c) throws InterruptedException {
        collector = c;
        collector.enter();
    }

    abstract protected void doTheJob(); //complex calculations can be done in here

    public void receive(BigInteger t) {
        collector.register(t);
    }

    public void run() {
        doTheJob();
        collector.leave();
    }
}

我本以为可以轻松地使用 ExecutorService 来代替不断创建新的 Thread,从而提高性能。

public BigInteger compute(int p) {
    ExecutorService pool = Executors.newFixedThreadPool(p);
    Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()];
    int i = 0;

    for(T element : Collection<T> bigCollection) {
        futures[i++] = p.submit(new Callable<BigInteger>() {

            @Override
            public BigInteger call() {
                long big = someVeryComplexCalculation(element, ...); //n!
                return BigInteger.valueOf(big);
            }

        }
    }

    // or with ExecutorCompletionService, but the loop remains I guess
    BigInteger res = BigInteger.ZERO
    for(Future<BigInteger> f : futures)
        res = res.add(f.get());

    return res;
}

然而,这段代码并没有超越 SumThread-Collector 的解决方案。我也看到了一些关于 LongAdder 的内容,但我需要一个适用于 BigInteger 的加法器...

我的问题是:最好的并行计算求和的方法是什么?是上述方法之一还是完全不同(但更好)的方式?

2个回答

7

你提到了Java-8中新增的LongAdder并使用effectively-final变量,我假设你正在使用Java-8。在这个版本中,解决你的任务最好的方法是使用Stream API

BigInteger result = bigCollection.parallelStream()
                     .map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...)))
                     .reduce(BigInteger.ZERO, BigInteger::add);

你的问题是一个典型的映射-归约任务,需要将某个集合中的每个元素进行转换,然后将单个转换的结果组合成最终结果。Stream API 能够在不需要手动工作的情况下有效地并行化这种任务。在 Oracle JDK 中,这些任务会在 common ForkJoinPool pool 中执行,默认情况下会创建与 CPU 核心数相同的线程。

这确实非常有效!只是出于好奇,是否有可能将其应用于Iterable<T>的实例?有没有一种方法可以压缩流?(例如:_complexCalculation() = part1()*part2(),因此我想在bigCollection上映射part1()和part2()并并行合并来自两个结果流的结果与乘法_) - Mr Tsjolder
1
@MrTsjolder,为了能够并行处理流,您的流源(Iterable或其他)必须提供一个良好分割源的Spliterator。如果您对自己的源有疑问,您可以将Iterable<T>的内容复制到ArrayList<T>中。至于压缩,我没有看到确切的源代码无法回答。您可以提出另一个问题,并提供所有必要的细节(使用“java-stream”标签,我会一直监控它)。 - Tagir Valeev

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接