500个工作线程,需要哪种线程池?

8

我在想这是否是最佳的做法。我有大约500个线程无限运行,但在完成一个处理周期后会休眠一分钟。

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

正在执行的代码非常简单,基本上只有这个。
class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

我需要为每个正在运行的任务单独创建一个线程,因此改变架构不是一个选项。我尝试将我的线程池大小设置为Runtime.getRuntime().availableProcessors(),以尝试运行所有500个线程,但仅允许其中8个(4x超线程)执行。其他线程不会放弃并让其他线程轮流执行。我尝试使用wait()和notify(),但是仍然没有运气。如果有人有一个简单的示例或一些技巧,我将非常感激!
嗯,设计可以说是有缺陷的。这些线程实现了遗传编程或GP,一种学习算法。每个线程分析高级趋势并进行预测。如果线程完成,学习就会丢失。也就是说,我希望sleep()能够在一个线程“不学习”时允许我共享一些资源。
因此,实际要求是:如何安排维护状态并每2分钟运行一次的任务,但控制同时执行的数量。

2
我不清楚你想做什么。 为什么不尝试运行所有线程呢?就像这样:for(int i....) { ((Thread)coreAppVector.elementAt(i)).start(); } - Andrea Polci
3
这里的现实情况是,使用500个线程需要比使用8个线程花费更长的时间。只有8个独立处理器会导致大量的上下文切换,这将导致每个2秒的计算时间变得更长。 - John Vint
2
您的情况下,500个线程至少太多了492个。因为您正在使用超线程技术,实际上并没有8个真正的CPU。我建议您使用轮询配置中的4个线程,这样可能可以获得最佳性能。 - Daniel Pryden
1
但是它们一定要是Thread对象吗?您可以将它们设置为Runnable并让它们保留此重要的内部状态,然后根据系统可管理的线程数量,在适当的时候调用run方法,这个数量可能超过4或8,如果任务的一部分被阻塞在网络/数据库/任何IO上。 - Affe
1
仅仅因为有八个(逻辑)核心,并不意味着最佳性能将来自于八个线程。如果线程是CPU绑定的,那么八个线程会更接近正确,但如果线程是IO绑定的,那么拥有更多的线程可能是有意义的。总体上同意这个想法,但必须小心控制上下文切换。 - user41871
显示剩余14条评论
11个回答

13
如果您的线程没有终止,那么这是线程内部代码的问题,而不是线程池的问题。如果您需要更详细的帮助,您需要发布正在执行的代码。
此外,为什么要在每个Thread完成后让其睡眠;让它完成不是更好吗?
另外,我认为您正在错误地使用线程池,因为您有一些线程数等于要执行的任务数。线程池的目的是对所使用资源的数量进行约束;这种方法并不比不使用线程池好。
最后,您不需要将Thread实例传递给ExecutorService,只需要传递Runnable实例即可。ExecutorService维护自己的线程池,这些线程无限循环,从内部队列中拉取工作(工作就是您提交的Runnable)。

2
正确。执行器不知道Runnable正在睡眠,直到Runnable完成,执行器才认为它正在运行。 - Steve Kuo

10

为什么不使用ScheduledExecutorService定时每分钟运行一次任务,而不是让所有这些线程在整整一分钟内保持空闲状态?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

你所说的"更改架构不是一种选择"是什么意思? 如果你的意思是你根本不能修改任务(特别是任务必须循环运行,而不能仅运行一次,并且不能删除对Thread.sleep()的调用),那么"良好的性能也不是一种选择"。


1
显然,他不应该使用线程,而应该实现“Runnable”,每个“run()”在单个计算后应该终止。 - erickson
1
如果每个可运行对象不在一个无限循环中,并且在完成单次运行后退出运行方法,则此解决方案将是OP正在寻找的最佳解决方案。 - John Vint

3
我不确定您的代码在使用线程池时是否语义上正确。ExecutionService在内部创建和管理线程,客户端只需提供一个Runnable实例,其run()方法将在池化线程的上下文中执行。您可以查看我的示例。还要注意,每个运行的线程需要大约10MB的系统内存用于堆栈,并且在Linux上,Java到本地线程的映射是1对1。

2

不要让线程休眠,而应该让它返回并使用ThreadPoolExecutor来执行每分钟发布到工作队列的任务。


2
回答你的问题,需要哪种类型的线程池? 我发表了我的评论,但这确实应该解决你的问题。您有一个计算需要2秒钟才能完成。您有很多任务(500个),希望尽快完成。假设没有IO和/或网络流量,则可以实现最快的吞吐量,即使用Runtime.getRuntime().availableProcessors()数量的线程。 如果将线程数增加到500个,则每个任务都将在自己的线程上执行,但操作系统将定期调度一个线程以提供给另一个线程。在任何给定时间,这是125个上下文切换。每个上下文切换都会增加每个任务运行的时间。 大局在此:当您超过处理器数量时,添加更多线程并不等于更高的吞吐量。 编辑:快速更新。您不需要在此处睡眠。当您使用8个处理器执行500个任务时,每个任务将在2秒钟内完成,完成并且正在运行的线程将接下来的任务并完成该任务。

1
这应该能够满足您的需求,但不是您所请求的 :-) 您必须删除 Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

这将安排Runnables每10秒显示行为。如果您确实需要在处理完成后等待固定的时间之后,您可能需要尝试使用哪种.scheduleXXX方法。我认为fixedWait将会每隔N段时间运行它,而不管执行时间是多少。


1

假设这是CPU密集型处理。如果线程是I/O密集型,则不正确。 - user41871
@willie Wheeler - 在问题的评论中,他说他正在执行计算。可以安全地假设他是受CPU限制的。你是读了还是只是草率回答? - Romain Hippeau

0
我确实需要为每个正在运行的任务创建单独的线程,因此更改架构不是一个选项。
如果这是真的(例如,调用外部阻塞函数),那么为它们创建单独的线程并启动它们。您无法创建具有有限线程数的线程池,因为其中一个线程中的阻塞函数将防止任何其他可运行线程进入它,并且使用每个任务一个线程创建线程池不会带来太多好处。
我尝试将我的线程池大小设置为Runtime.getRuntime().availableProcessors(),它尝试运行所有500个线程,但只允许8个(4xhyperthreading)执行。
当您将创建的Thread对象传递给线程池时,它只看到它们实现了Runnable接口。因此,它将运行每个Runnable直到完成。任何停止run()方法返回的循环都不会允许下一个排队的任务运行;例如:
public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

输出如下:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

展示前 (线程池大小) 的任务在被调度之前已经完成。

你需要做的是创建一些运行一段时间,然后让其他任务运行的任务。你如何组织这些任务取决于你想要实现什么。

  • 你是否希望所有的任务同时运行,等待一分钟后再次同时运行,还是这些任务不同步
  • 你是否真的希望每个任务以一分钟为间隔运行
  • 你的任务是否有可能阻塞,因此需要单独的线程
  • 如果一个任务阻塞时间超过了预期的运行窗口,那么预期的行为是什么
  • 如果一个任务阻塞时间超过了重复率 (阻塞超过了一分钟),那么预期的行为是什么

根据这些答案,可以使用 ScheduledExecutorService、信号量或互斥锁来协调任务。最简单的情况是非阻塞、非同步任务,在这种情况下,直接使用 ScheduledExecutorService 每分钟运行一次可运行实例。


正如已经提到的,要注意使用yield()会引入“忙等待”,应该避免使用。 - Andrea Polci
我以为你说这不是“忙等待”,那到底是什么呢? - user177800

0

你能否重写你的项目,使用一些基于代理的并发框架,比如Akka


-1
你需要一个信号量。
class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

在实例化 AThreads 时,您需要传递相同的信号量实例:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

编辑:谁投了反对票可以解释一下为什么吗?我的解决方案有问题吗?


OP没有提到任何关于同步的事情。这完全是离题了。 - SimonC
OP询问如何确保在任何时刻只有固定数量的线程处于“活动”状态,这是一个同步问题。也许我的解决方案不是最优的(我喜欢被接受的解决方案),但它肯定不是离题的,因为它解决了这个问题。 - Andrea Polci

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接