不需要反复创建线程,是否可以使用多线程?

3

首先,再次感谢所有已经回答我的问题的人。我不是一个非常有经验的程序员,这是我第一次使用多线程。

我得到了一个例子,它的工作方式与我的问题相当类似。我希望它能够减轻我们在这里的情况。

public class ThreadMeasuring {
private static final int TASK_TIME = 1; //microseconds
private static class Batch implements Runnable {
    CountDownLatch countDown;
    public Batch(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    @Override
    public void run() {         
        long t0 =System.nanoTime();
        long t = 0;
        while(t<TASK_TIME*1e6){ t = System.nanoTime() - t0; }

        if(countDown!=null) countDown.countDown();
    }
}

public static void main(String[] args) {
    ThreadFactory threadFactory = new ThreadFactory() {
        int counter = 1;
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "Executor thread " + (counter++));
            return t;
        }
    };

  // the total duty to be divided in tasks is fixed (problem dependent). 
  // Increase ntasks will mean decrease the task time proportionally. 
  // 4 Is an arbitrary example.
  // This tasks will be executed thousands of times, inside a loop alternating 
  // with serial processing that needs their result and prepare the next ones.
    int ntasks = 4; 
    int nthreads = 2;
    int ncores = Runtime.getRuntime().availableProcessors();
    if (nthreads<ncores) ncores = nthreads;     

    Batch serial = new Batch(null);
    long serialTime = System.nanoTime();
    serial.run();
    serialTime = System.nanoTime() - serialTime;

    ExecutorService executor = Executors.newFixedThreadPool( nthreads, threadFactory );
    CountDownLatch countDown = new CountDownLatch(ntasks);

    ArrayList<Batch> batches = new ArrayList<Batch>();
    for (int i = 0; i < ntasks; i++) {
        batches.add(new Batch(countDown));
    }

    long start = System.nanoTime();
    for (Batch r : batches){
        executor.execute(r);
    }

    // wait for all threads to finish their task
    try {
        countDown.await();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    long tmeasured = (System.nanoTime() - start);

    System.out.println("Task time= " + TASK_TIME + " ms");
    System.out.println("Number of tasks= " + ntasks);
    System.out.println("Number of threads= " + nthreads);
    System.out.println("Number of cores= " + ncores);
    System.out.println("Measured time= " + tmeasured);
    System.out.println("Theoretical serial time= " + TASK_TIME*1000000*ntasks);
    System.out.println("Theoretical parallel time= " + (TASK_TIME*1000000*ntasks)/ncores);
    System.out.println("Speedup= " + (serialTime*ntasks)/(double)tmeasured);

    executor.shutdown();
}
 }

而不是进行计算,每个批次只需等待一定的时间。程序计算了“加速比”,理论上始终为2,但如果“TASK_TIME”很小,则可能小于1(实际上是“减速”)。
我的计算需要最多1毫秒,通常更快。对于1毫秒,我发现速度略有提高约30%,但在实践中,使用我的程序,我注意到出现了“减速”。
这段代码的结构与我的程序非常相似,因此如果您能帮助我优化线程处理,我将非常感激。
此致敬礼。
以下是原始问题:
嗨。
我想在我的程序上使用多线程,因为我相信这可以大大提高效率。它的大部分运行时间归因于独立计算。
我的程序有数千个独立计算(解决几个线性系统),但它们仅以少量组的形式同时发生,每个组需要几毫秒才能运行。在这些计算组之一之后,程序必须按顺序运行一段时间,然后我必须再次解决线性系统。
实际上,可以看作要解决的这些独立线性系统位于迭代数千次的循环内,与依赖于先前结果的顺序计算交替进行。我加速程序的想法是通过将每个组分成(可用处理器数)个独立计算的批次来在并行线程中计算这些独立计算。因此,原则上根本没有排队。
我尝试使用FixedThreadPool和CachedThreadPool,但它比串行处理还要慢。每次需要解决批次时,似乎花费了太多时间创建新线程。
是否有更好的方法来处理这个问题?我使用的这些池似乎适用于每个线程需要更长时间而不是数千个较小的线程的情况...
谢谢!最好的问候!

能否贴一些代码?如果使用固定的线程池,它不会一遍又一遍地创建线程(而是重复利用)。 - Jeff Foster
你在哪个平台上运行这个程序?多核服务器和五年前的黑莓手机之间有很大的差别。 - MusiGenesis
@ursoouindio,我提出了使用阻塞队列的生产者/消费者模式,请查看我的答案以获取更多细节。 - Kiril
@Jeff:我需要创建一个类似的示例,因为我的代码依赖于几个类。我认为线程创建问题是由并行和顺序部分的交替所导致的。实际上,并行部分只涉及程序代码的一小部分。 - ursoouindio
@MusiGenesis,我在两台机器上使用Ubuntu Linux 10.04:一个Core 2 Duo T7250和一个Core 2 Quad Q6600。 - ursoouindio
6个回答

5

线程池不会一遍又一遍地创建新线程,这就是它们被称为“池”的原因。

你使用了多少个线程,你有多少个CPU/核心?系统负载如何(通常情况下,当你串行执行时以及使用线程池时)?是否涉及同步或任何类型的锁定?

并行执行的算法是否与串行执行完全相同(您的描述似乎表明串行执行重用了一些先前迭代的结果)。


谢谢回复,@Konrad。是的,线程池的数量不是问题,可能是我在代码中如何管理它们的问题。我会调用executor.execute(r)成千上万次,对于每个并行批处理r(而且executor只声明了一次:executor = Executors.newFixedThreadPool(numberOfCores);)。据我所知,它们会变成新线程(如果我错了,请纠正我)。我在两台机器上运行Ubuntu 10.04,一台双核,另一台四核。我使用的线程数=核心数。 - ursoouindio
使用更多的线程怎么样?例如numberOfProcessors+2?你是否为此实例化了数千个“Runnables”或“Callables”? - Konrad Garus
我测试了使用超过核心数量的情况,但当时并没有显示出改进。我不会一次性实例化所有的Runnables,而是在主循环内部执行此操作。 - ursoouindio
@Konrad,它不使用IO或共享资源,而是关于一组代数微分方程的集成。我正试图从其独立线性系统中获得好处。我只需提供初始条件,程序就可以自行运行。 - ursoouindio
关于开销,也许是这样的情况,尽管我希望它能提高效率。关于批次数量,这取决于一些因素,但现在大约有190个线性系统被调用了6850次,将被分成2或4个核心,在最后计算1312328个线性求解。 - ursoouindio
显示剩余7条评论

1

我不确定你是如何进行计算的,但如果你将它们分成小组,那么你的应用程序可能适合使用生产者/消费者模式。

此外,你可能会对使用BlockingQueue感兴趣。计算消费者将会阻塞,直到队列中有东西,并且阻塞发生在take()调用上。

private static class Batch implements Runnable {
    CountDownLatch countDown;
    public Batch(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    CountDownLatch getLatch(){
        return countDown;
    }

    @Override
    public void run() {         
        long t0 =System.nanoTime();
        long t = 0;
        while(t<TASK_TIME*1e6){ t = System.nanoTime() - t0; }

        if(countDown!=null) countDown.countDown();
    }
}

class CalcProducer implements Runnable {
    private final BlockingQueue queue;
    CalcProducer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while(true) { 
                CountDownLatch latch = new CountDownLatch(ntasks);
                for(int i = 0; i < ntasks; i++) {
                    queue.put(produce(latch)); 
                }
                // don't need to wait for the latch, only consumers wait
            }
        } catch (InterruptedException ex) { ... handle ...}
    }

    CalcGroup produce(CountDownLatch latch) {
        return new Batch(latch);
    }
}

class CalcConsumer implements Runnable {
    private final BlockingQueue queue;

    CalcConsumer(BlockingQueue q) { queue = q; }

    public void run() {
        try {
            while(true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... handle ...}
    }

    void consume(Batch batch) { 
        batch.Run();
        batch.getLatch().await();
    }
}

class Setup {
    void main() {
        BlockingQueue<Batch> q = new LinkedBlockingQueue<Batch>();
        int numConsumers = 4;

        CalcProducer p = new CalcProducer(q);
        Thread producerThread = new Thread(p);
        producerThread.start();

        Thread[] consumerThreads = new Thread[numConsumers];

        for(int i = 0; i < numConsumers; i++)
        {
            consumerThreads[i] = new Thread(new CalcConsumer(q));
            consumerThreads[i].start();
        }
    }
}

如果有任何语法错误,我很抱歉。我一直在编写C#代码,有时会忘记正确的Java语法,但是总体思路是正确的。


比起我上面的回答,这是一个更加详细(也可能更加正确)的实现。+1 - corsiKa
谢谢,@Lirik!我会花时间理解你的想法。 - ursoouindio
@ursoouindio 如果有什么不清楚的地方,请告诉我...基本思想类似于快餐店:你有一条顾客队列,同时有多个收银员。收银员在空闲时等待,直到有人排队,然后下一个可用的收银员从队列中叫号并为他们提供服务。顾客是生产者,而收银员是消费者。 - Kiril
我明白,@Lirik。我只是错过了CalcGroup类,但我相信这不是这个想法的重要部分。我已经更新了问题并附上了代码,你能帮我将你的想法应用到那个代码中吗?(如果你认为它会有效) - ursoouindio
@ursoouindio,我不知道你把一组计算称为 Batch(很好的名称),所以我使用了一个虚拟名称 CalcGroup,因此 CalcGroup 将被 Batch 替换。我已经考虑到这点更新了我的答案:请注意,虽然这样做足够有效,但等待 CountDownLatch 不是使用此模式的最佳方式。 - Kiril
@Lirik,感谢您的帮助,但我恐怕无法从排队中获益,因为我总是设法让批次数量与可用处理核心数相同。将我的计算任务分成更多批次实际上会减慢我的程序。但我会记住这个想法,非常感谢您。 - ursoouindio

1
如果您遇到的问题无法扩展到多个核心,那么您需要更改程序,或者您面临的问题不像您想象的那样并行化。我怀疑您还有其他类型的错误,但根据所给信息无法确定。
这段测试代码可能会有所帮助。
Time per million tasks 765 ms

代码

ExecutorService es = Executors.newFixedThreadPool(4);
Runnable task = new Runnable() {
    @Override
    public void run() {
        // do nothing.
    }
};
long start = System.nanoTime();
for(int i=0;i<1000*1000;i++) {
    es.submit(task);
}
es.shutdown();
es.awaitTermination(10, TimeUnit.SECONDS);
long time = System.nanoTime() - start;
System.out.println("Time per million tasks "+time/1000/1000+" ms");

编辑:假设您有一个循环,按顺序执行此操作。

for(int i=0;i<1000*1000;i++)
    doWork(i);

你可能会认为像这样改变循环会更快,但问题是开销可能比收益还要大。
for(int i=0;i<1000*1000;i++) {
    final int i2 = i;
    ex.execute(new Runnable() {
        public void run() {
            doWork(i2);
        }
    }
}

因此,您需要创建工作批次(每个线程至少一个),以确保有足够的任务来让所有线程保持繁忙状态,但是不要有太多任务,以免线程花费时间在开销上。

final int batchSize = 10*1000;
for(int i=0;i<1000*1000;i+=batchSize) {
    final int i2 = i;
    ex.execute(new Runnable() {
        public void run() {
            for(int i3=i2;i3<i2+batchSize;i3++)
               doWork(i3);
        }
    }
}

编辑2:运行一个在不同线程之间复制数据的测试。

for (int i = 0; i < 20; i++) {
    ExecutorService es = Executors.newFixedThreadPool(1);
    final double[] d = new double[4 * 1024];
    Arrays.fill(d, 1);
    final double[] d2 = new double[4 * 1024];
    es.submit(new Runnable() {
        @Override
        public void run() {
            // nothing.
        }
    }).get();
    long start = System.nanoTime();
    es.submit(new Runnable() {
        @Override
        public void run() {
            synchronized (d) {
                System.arraycopy(d, 0, d2, 0, d.length);
            }
        }
    });
    es.shutdown();
    es.awaitTermination(10, TimeUnit.SECONDS);
    // get a the values in d2.
    for (double x : d2) ;
    long time = System.nanoTime() - start;
    System.out.printf("Time to pass %,d doubles to another thread and back was %,d ns.%n", d.length, time);
}

一开始不太好,但在约50微秒后变得更加稳定。

Time to pass 4,096 doubles to another thread and back was 1,098,045 ns.
Time to pass 4,096 doubles to another thread and back was 171,949 ns.
 ... deleted ...
Time to pass 4,096 doubles to another thread and back was 50,566 ns.
Time to pass 4,096 doubles to another thread and back was 49,937 ns.

ExecutorService es = Executors.newFixedThreadPool(4); 的结果为:每一百万个任务的时间为1561毫秒 - ursoouindio
抱歉,也许我错过了某些内容或者表述不太清晰。实际上,我的经验不是很丰富。我在问题中添加了一些代码,也许它可以阐明情况。 - ursoouindio
这只是一个非常小的例子,正如你所说,它仍然更快,但你看到任务运行得更慢。既然你在睡觉,应该可以尝试10个线程和1000个任务。 - Peter Lawrey
每个double[]相当于12个doubles(两个用于开销/对象头),160个这样的数组需要大约2K个doubles,如果数据需要双向传递,则大约需要4K个doubles,这可能需要120微秒。这很重要,但我认为不是问题的原因。在您的CPU上可能会加倍。我认为您需要缩小问题范围,看看较小的样本如何表现,以便确定延迟来自哪里。尝试传递一个大的double[],然后进行clone()并传回。 - Peter Lawrey
我注意到的一件事是,如果你有少量的任务,它们大约需要1毫秒的时间。可能是因为系统还没有热起来。 - Peter Lawrey
显示剩余23条评论

1
据我所读:“成千上万的独立计算……同时发生……需要几毫秒才能运行”,我认为您的问题非常适合使用GPU编程。
我认为这回答了您的问题。GPU编程变得越来越流行。CUDA和OpenCL都有Java绑定。如果您可以使用它,我建议您去尝试一下。

0
这里是我思考的伪大纲。
class WorkerThread extends Thread {

    Queue<Calculation> calcs;
    MainCalculator mainCalc;

    public void run() {
        while(true) {
            while(calcs.isEmpty()) sleep(500); // busy waiting? Context switching probably won't be so bad.
            Calculation calc = calcs.pop(); // is it pop to get and remove? you'll have to look
            CalculationResult result = calc.calc();
            mainCalc.returnResultFor(calc,result);      
        }
    }


}

如果您正在调用外部程序,则另一种选择是不要将它们放在逐个执行的循环中,否则它们不会并行运行。您可以将它们放在逐个处理的循环中,但不能逐个执行它们。

Process calc1 = Runtime.getRuntime.exec("myCalc paramA1 paramA2 paramA3");
Process calc2 = Runtime.getRuntime.exec("myCalc paramB1 paramB2 paramB3");
Process calc3 = Runtime.getRuntime.exec("myCalc paramC1 paramC2 paramC3");
Process calc4 = Runtime.getRuntime.exec("myCalc paramD1 paramD2 paramD3");

calc1.waitFor();
calc2.waitFor();
calc3.waitFor();
calc4.waitFor();

InputStream is1 = calc1.getInputStream();
InputStreamReader isr1 = new InputStreamReader(is1);
BufferedReader br1 = new BufferedReader(isr1);
String resultStr1 = br1.nextLine();

InputStream is2 = calc2.getInputStream();
InputStreamReader isr2 = new InputStreamReader(is2);
BufferedReader br2 = new BufferedReader(isr2);
String resultStr2 = br2.nextLine();

InputStream is3 = calc3.getInputStream();
InputStreamReader isr3 = new InputStreamReader(is3);
BufferedReader br3 = new BufferedReader(isr3);
String resultStr3 = br3.nextLine();

InputStream is4 = calc4.getInputStream();
InputStreamReader isr4 = new InputStreamReader(is4);
BufferedReader br4 = new BufferedReader(isr4);
String resultStr4 = br4.nextLine();

谢谢,@glowcoder!我的程序中没有使用队列,如果我有4个核心,我只需创建4个批次并调用4个可运行对象,解决它们并跟随代码的顺序部分,直到再次需要独立计算。 - ursoouindio
这就是我想说的。如果你在线程中添加一个队列,你可以使用同一个线程,并在计算可用时给它解决问题。然后,你的主要计算只需等待获取计算结果即可。 - corsiKa
好的,我想我明白你的意思了。在我的代码中,独立的计算可以通过c代码(通过JNI实现)调用。我需要再考虑一下如何应用这个想法,但是目前我没有进行大规模重构的计划。还有一件事,我不能extend Thread,因为并行例程扩展了我自己的基本类... - ursoouindio
关于JNI:您也可以使用外部进程而不是使用线程。我会将这个想法编辑到帖子中。关于扩展Thread:您始终可以“实现Runnable”并且“Thread worker = new Thread(new Worker())”。 - corsiKa
实际上,JNI 代码在主循环中运行,它调用了多个 Java 方法,其中一些我正试图并行化。 - ursoouindio

0

嗯,CachedThreadPool 似乎是专门为您的情况创建的。如果您很快就重用线程,它不会重新创建线程,如果您在使用新线程之前花费了整整一分钟,线程创建的开销相对较小。

但是,除非您也可以并行访问数据,否则不能指望并行执行加速计算。如果您使用大量锁定、许多同步方法等,您将花费更多的开销而不是获得并行处理的收益。检查您的数据是否可以高效地并行处理,并且代码中没有非明显的同步问题。

此外,如果数据完全适合缓存,CPU 可以有效地处理数据。如果每个线程的数据集大于一半的缓存,则两个线程将竞争缓存并发出许多 RAM 读取,而一个线程(如果仅使用一个核心)可能表现更好,因为它避免了在执行紧密循环时进行 RAM 读取。也要检查这个问题。


谢谢你的回答!在我的情况下,它永远不需要等待60秒才能进入并行部分。这是我程序中最频繁的部分。你认为CachedThreadPool更好吗,即使计划始终使用相同数量的并行线程? - ursoouindio
关于锁定和同步,这不是我的情况,因为我的程序在这方面具有更简单的结构。每个线程只需要双重数组。它们必须在每个批次中计数到2000个元素。 - ursoouindio

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接