强制多个线程在有多个可用CPU时使用它们

71

我正在编写一款Java程序,由于它所执行的任务特性,它需要大量的CPU资源。不过,其中许多部分可以并行运行,并且我已经将我的程序设计成多线程的形式。但是,当我运行它时,似乎只使用了一个CPU,直到需要更多的CPU资源时才使用其他CPU - 那么,在Java中有没有办法强制不同的线程在不同的核心/ CPU上运行呢?


3
我不确定你的问题是否清晰,现在我想一想。你是在问(a)如何使其在多个线程中运行,(b)为什么多线程代码没有使用超过一个核心,还是(c)为什么CPU负载不均匀? - BobMcGee
你的应用程序没有足够的任务可以独立运行,以便一次使用多个CPU。问题极不可能出现在操作系统中,因为这些已经经过数百万人多年的测试。你应该再次查看你的程序,看看你期望同时运行哪些任务,并尝试确定是什么阻止了这种情况的发生。 - Peter Lawrey
2
Peter,如果一个应用程序正在运行两个线程,则它有足够的东西可以在多个核心上运行。即使所有其他线程所做的只是启动和终止,仍然有一个工作负载可用于在第二个核心上运行。——将单个核心分配给多个线程,仅因为它们每个人在此时似乎没有高工作量,这是不可取的。那么,对于同步到某些非CPU工作负载(例如一般附加计算板发出信号表明它已完成其工作负载)的想法呢?线程亲和力非常重要!Java应该支持这一点。 - Rick Hodgin
10个回答

62
有两种基本的Java多线程方法。使用这些方法创建的每个逻辑任务应在需要和可用时在新的核心上运行。
方法一:定义一个Runnable或Thread对象(可以在构造函数中接受Runnable),并使用Thread.start()方法开始运行。它将在操作系统分配的任何核心上执行--通常是负载较轻的核心。
教程:定义和启动线程 方法二:定义实现Runnable(如果它们不返回值)或Callable(如果它们返回值)接口的对象,其中包含您的处理代码。将它们作为任务传递给java.util.concurrent包中的ExecutorService。java.util.concurrent.Executors类有许多方法来创建标准、有用的ExecutorServices。链接到Executor教程。
从个人经验来看,Executors固定和缓存线程池非常好,尽管您需要调整线程计数。Runtime.getRuntime().availableProcessors()可以在运行时用于计算可用核心数。您需要在应用程序完成后关闭线程池,否则应用程序将无法退出,因为ThreadPool线程仍在运行。
获得良好的多核性能有时很棘手,充满了陷阱:
  • 当并行运行时,磁盘I/O会大大减慢。每次只有一个线程应该进行磁盘读写。
  • 对象同步为多线程操作提供了安全性,但会减缓工作速度。
  • 如果任务太琐碎(小的工作量,执行快),则在ExecutorService中管理它们的开销比从多个核心获得的收益更高。
  • 创建新的Thread对象很慢。如果可能的话,ExecutorServices将尝试重用现有线程。
  • 当多个线程同时处理某些内容时,各种疯狂的事情都可能发生。保持系统简单,并尝试使任务逻辑上不相互影响。

还有另一个问题:控制工作很难!一个好的实践是有一个管理线程来创建和提交任务,然后有几个带有工作队列的工作线程(使用ExecutorService)。

我只是简要介绍了关键点--许多专家认为,多线程编程是最难的编程主题之一。 它是非直观的、复杂的,而且抽象通常很弱。


编辑 -- 使用 ExecutorService 的示例:

public class TaskThreader {
    class DoStuff implements Callable {
       Object in;
       public Object call(){
         in = doStep1(in);
         in = doStep2(in);
         in = doStep3(in); 
         return in;
       }
       public DoStuff(Object input){
          in = input;
       }
    }

    public abstract Object doStep1(Object input);    
    public abstract Object doStep2(Object input);    
    public abstract Object doStep3(Object input);    

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Callable> tasks = new ArrayList<Callable>();
        for(Object input : inputs){
           tasks.add(new DoStuff(input));
        }
        List<Future> results = exec.invokeAll(tasks);
        exec.shutdown();
        for(Future f : results) {
           write(f.get());
        }
    }
}

太棒了!我去阅读了更多关于这个主题的内容,因为我不清楚Executors的优点。我还不确定其他的东西,但是FixedThreadPool似乎很好,因为它限制了正在运行的线程数量,(1)避免任务变化导致负载过重,(2)确保一些线程先完成(并快速获得一些结果)。这对于进行实验非常有用。 - toto_tico

31
当我运行它时,它似乎只使用一个CPU,直到需要更多的时候才会使用另一个CPU - 在Java中是否有任何方法可以强制不同的线程在不同的核心/ CPU上运行?
我理解你问题的这一部分意思是指你已经解决了使应用程序具备多线程能力的问题。尽管如此,它并不会立即开始使用多个内核。
“是否有任何方法可以强制…”的答案(据我所知)是没有直接的方法。您的JVM和/或主机操作系统决定使用多少“本地”线程以及如何将这些线程映射到物理处理器。您可以进行一些调整选项。例如,我发现this page谈论了如何在Solaris上调整Java线程。this page谈论了其他可能导致多线程应用程序变慢的因素。

20

首先,你需要证明你的程序在多个核心上运行会更快。许多操作系统会尽可能地将程序线程运行在同一核心上。

在同一核心上运行有很多优点。CPU缓存是热的,这意味着该程序的数据已加载到CPU中。锁定/监视/同步对象在CPU缓存中,这意味着其他CPU不需要在总线上执行缓存同步操作(代价高昂!)。

可以非常容易地使您的程序始终在同一CPU上运行的一个问题是过度使用锁定和共享内存。您的线程不应相互通信。线程越不经常在同一内存中使用相同的对象,它们就越经常在不同的CPU上运行。它们越经常使用相同的内存,它们就越经常必须阻塞等待其他线程。

每当操作系统看到一个线程为另一个线程阻塞时,它将尽可能在同一CPU上运行该线程。这减少了在CPU之间移动的内存量。我猜这就是导致您程序中出现的情况。


8

首先,我建议阅读"Java并发编程实战" by Brian Goetz

alt text

这是迄今为止最好的描述并发Java编程的书籍。
并发编程“易学难精”。我建议在尝试之前多阅读有关该主题的内容。很容易使多线程程序在99.9%的时间内正确工作,但在0.1%的时间内失败。但是,以下是一些入门提示:
有两种常见方法可以使程序使用多个核心:
1. 使程序使用多个进程运行。一个例子是使用Pre-Fork MPM编译的Apache,它将请求分配给子进程。在多进程程序中,默认情况下不共享内存。但是,您可以在进程之间映射共享内存的部分。Apache使用其“记分牌”进行此操作。
2. 使程序具有多线程功能。在多线程程序中,默认情况下所有堆内存都是共享的。每个线程仍然有自己的堆栈,但可以访问堆的任何部分。通常,大多数Java程序都是多线程而不是多进程。

在最低层次上,可以 创建和销毁线程。Java使得以可移植的跨平台方式创建线程变得容易。

由于频繁地创建和销毁线程往往会变得昂贵,因此Java现在包括Executors来创建可重用的线程池。任务可以分配给执行器,并且可以通过Future对象检索结果。

通常,人们有一个任务可以分成更小的任务,但最终结果需要合并在一起。例如,在归并排序中,可以将列表分成越来越小的部分,直到每个核心都进行排序。但是,由于每个子列表都已排序,因此需要合并它们以获得最终排序列表。由于这种“分而治之”的问题相当普遍,因此有一个JSR框架可以处理底层的分发和连接。这个框架可能会包含在Java 7中。

1
JSR 166y框架已经被包含在Java 7中的java.util.concurrent包的ForkJoinPool和ForkJoinTask类中。http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html - buzz3791

4

1

您可以使用Java 8版本中的Executors API。

public static ExecutorService newWorkStealingPool()

创建了一个工作偷懒的线程池,使用所有可用的处理器作为其目标并行度水平。

由于工作偷懒的机制,空闲线程从忙碌线程的任务队列中窃取任务,整体吞吐量将增加。

grepcode看,newWorkStealingPool的实现如下:

/**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

1

最简单的方法是将程序分成多个进程。操作系统会将它们分配到各个核心上。

稍微困难一些的方法是将程序分成多个线程,并信任JVM适当地分配它们。这通常是人们利用可用硬件的方式。


编辑

一个多进程程序如何变得“更容易”?这是管道中的一步。

public class SomeStep {
    public static void main( String args[] ) {
        BufferedReader stdin= new BufferedReader( System.in );
        BufferedWriter stdout= new BufferedWriter( System.out );
        String line= stdin.readLine();
        while( line != null ) {
             // process line, writing to stdout
             line = stdin.readLine();
        }
    }
}

管道中的每个步骤都具有类似的结构。包括任何处理在内,需要9行开销。

这可能不是绝对最有效的方法。但它非常简单。


您的并发进程的整体结构不是JVM问题,而是操作系统问题,因此请使用Shell。

java -cp pipline.jar FirstStep | java -cp pipline.jar SomeStep | java -cp pipline.jar LastStep

唯一剩下的就是为管道中的数据对象设计一些序列化。标准序列化效果很好。阅读http://java.sun.com/developer/technicalArticles/Programming/serialization/以获取有关如何进行序列化的提示。您可以使用ObjectInputStreamObjectOutputStream替换BufferedReaderBufferedWriter来实现这一点。

6
相较于多线程应用程序,为什么多进程应用程序实现起来会更简单 - Michael Borgwardt
2
不确定多进程一定有帮助--取决于你的操作系统,它可能已经在线程级别上进行调度了。 - Neil Coffey
1
@Lott:但如果你的目标是性能,那么这对你并没有太大好处,不是吗?你基本上是在制作一个更慢的消息传递接口版本。我同意分离处理阶段,但为什么要通过Stream来实现呢?使用工作队列和工作线程不是更好吗? - BobMcGee
2
@Lott 如果仅考虑速度,C语言确实更快--问题在于Java的流I/O在每次I/O调用时都要进行同步和检查,而不是管道。同时,这也不会更容易--如果使用标准输出/输入,你需要定义通信协议并处理可能存在的解析问题。也别忘了写入 StdOut 时会出现异常!使用管理线程、ExecutorServices和Runnable/Callable任务更容易实现。这可以用非常简单的代码(带有错误检查)实现,在100行以内,潜在地非常快,而且性能良好。 - BobMcGee
1
@BobMcGee:如果你的工作负载设计不良并且花费大量时间启动子进程,那么VM启动只会是一种惩罚。如果你的工作负载设计良好,你的进程运行时间很长,启动开销就会在长时间运行时间内分摊。 - S.Lott
显示剩余16条评论

1

我认为这个问题与Java并行处理框架(JPPF)有关。使用它,您可以在不同的处理器上运行不同的作业。


1

+1 for the reference. 这个PDF的链接好像失效了,如果你还有PDF的标题,能否分享一下? - Sundeep

1
你应该编写程序,以一堆 Callable 的形式完成工作,并将它们交给 ExecutorService 并使用 invokeAll(...) 执行。
然后,你可以在运行时从 Executors 类中选择适当的实现。建议使用 Executors.newFixedThreadPool() 方法,并传入一个大致对应于要保持繁忙的 CPU 核心数量的数字。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接