如何避免Java ExecutorService任务队列超负荷?

3
我有以下代码片段,可以正常运行。但问题是它一开始就创建并将2000多个任务放在执行器队列中。
我需要检查执行器队列中的任务是否已经完成,只有在这种情况下才会添加更多任务。它不需要很精确,例如,如果队列中只剩下<10个任务,则再添加50个。
这样,执行器任务队列就不会有那么多待处理任务,这也将使关闭()能够及时工作,否则即使调用了该函数,执行器仍然会尝试先完成其队列中的所有2000个任务。
如何最好地实现这一点?谢谢。
executor = Executors.newFixedThreadPool(numThreads);

while(some_condition==true)
{
    //if(executor < 10 tasks pending)  <---- how do i do this?
    //{                             
        for(int k=0;k<20;k++)
        {  
            Runnable worker = new MyRunnable();
            executor.execute(worker);
        }
    //}
    //else 
    //{
    //      wait(3000);
    //}
} 

使用信号量进行更新:
private final Semaphore semaphore = new Semaphore(10)
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample();

while(some_condition==true)
{

        Runnable worker = new MyRunnable();
        //So at this point if semaphore is full, then while loop would PAUSE(??) until
        //semaphore frees up again.
          executor.execute(worker);   
} 
4个回答

8
我有以下代码片段,可以正常运行。但是问题是它会立即创建并放置超过2000个任务到执行器队列中。
一种方法是使用自己的ThreadPoolExecutor,限制作业队列并在其上设置自定义RejectedExecutionHandler。这样可以对排队的作业数量进行细粒度控制。
您需要自定义处理程序,因为默认情况下,如果队列已满,ThreadPoolExecutor.submit(...)将抛出RejectedExecutionException。使用下面的自定义处理程序时,当它被队列拒绝时,拒绝处理程序会将其放回,并阻塞直到队列有空间。因此,不会拒绝/丢弃任何作业。
以下是大致如何启动自己的线程池并设置自己的拒绝处理程序。
// you can tune the blocking queue size which is the number of jobs to queue
// when the NUM_THREADS are all working
final BlockingQueue<MyRunnable> queue =
    new ArrayBlockingQueue<MyRunnable>(NUM_JOBS_TO_QUEUE);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the job that fills the queue, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full as opposed to throwing
      executor.getQueue().put(r);
   }
});
...
// now submit all of your jobs and it will block if the queue is full
for(int k = 0; k < 20000000; k++) {  
   Runnable worker = new MyRunnable();
   threadPool.execute(worker);
}

关于阻塞线程池的更多细节,请参见我在此处的答案:

如何使ThreadPoolExecutor命令等待,如果有太多数据需要处理?

您还可以使用ThreadPoolExecutor.CallerRunsPolicy,这将导致提交作业到线程池的调用者执行该作业。然而,我不喜欢这个解决方案,因为它会阻塞调用者直到作业完成,这可能会使其他工作线程饥饿。而且,如果有多个提交者,它可能仍然会导致太多的线程运行作业。

最后,请注意,我将ThreadPoolExecutor中的核心线程数和最大线程数设置为相同的数字。不幸的是,默认情况下,执行器启动核心线程,然后填充队列,只有在达到最大线程数时才分配其他线程。这完全违反直觉。


但问题是我不想拒绝任务或提出任何问题,我只想等待任务队列有较少的挂起任务,然后添加这些任务。创建一个计数器(int tasksRunning = 0)是否可行?每次调用execute()时,tasksRunning++,在runnable.run()完成时执行tasksRunning--,谢谢。 - user1539050
我的解决方案不会拒绝任务,这就是重点。我将更明确地告诉@user1539050。 - Gray
请参考以下链接:https://dev59.com/i3I-5IYBdhLWcg3wI0t9 - wrschneider

7
您可以使用一个简单的信号量(Semaphore)。在提交时获取一个新的许可证,在完成后释放该许可证,以允许任何等待提交的人进行提交。
private final Semaphore semaphore = new Semaphore(10);//or however you want max queued at any given moment
ThreadPoolExecutor tp= new ThreadPoolExecutor(...){
      public void execute(Runnable r){
          semaphore.acquire();
          super.execute(r);
      }    
      public void afterExecute(Runnable r, Thread t){
         semaphore.release();  
         super.afterExecute(r,t);
      }
};

如果没有更多的许可证可用,这里提交的线程将被暂停。


不错的建议,John。可能建议捕获和处理RejectedExecutionException或者小心确保你的信号量值小于阻塞队列限制(如果有的话)。 - Gray
信号量数值(10)并不是最大排队数,而是运行+排队的数量。 - Gray
@Gray 你说的两点都对,很好。 我把这个作为一个起点放上来,但如果 OP 要实现它,应该考虑这些建议。 - John Vint
谢谢John和Gary,我认为这正是我所需要的。但为了确认,你们能否看一下我的回复,其中包含一些额外的代码问题。谢谢。 - user1539050

3
我通常使用对象池队列来限制此类系统,该队列是一个填充了X个任务的BlockingQueue。任何想要向线程提交任务的东西都必须从池队列中获取一个,将其加载数据后再提交它。
当任务完成并处理结果时,它会被推回到池队列以便重复使用。
如果池为空,则提交线程会在池队列上阻塞,直到返回一些任务。
这本质上是一种信号量控制,正如@John Vint所建议的那样,但还具有其他优点——例如没有可运行对象的持续创建/垃圾回收。我喜欢定时将PooolQueue.size转储到GUI状态栏,这样我就可以看到系统有多“繁忙”(并快速检测任何对象泄漏)。

2

为了避免过载线程池,最好设置一个拒绝策略。我发现最简单的方法是像这样做:

您将更好地设置拒绝策略,因为您不想使线程池过载。为了实现这一点而不会使自己过于复杂化,我发现最好的方法是采用以下方法:

final ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(THREADS_COUNT);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

一旦所有线程都忙碌,调用方的线程将执行任务。这是一种名为“CallerRunsPolicy”的策略,可参考此页面:CallerRunsPolicy JavaDoc

不会给这个贴投反对票,但是要注意强制类型转换很危险,因为它并不一定是 ThreadPoolExecutor。 - KyleM
ExecutorService是返回实例的类型,它没有拒绝执行策略的setter。如果您不将其向下转换,则无法执行该操作。在这种情况下,为了通过不排队更多作业来控制执行,请使用troutle。 - Guido Medina
执行器通常比较棘手,需要Oracle/Sun的一些工作,例如队列中没有拒绝策略来阻止线程在获取作业时进行阻塞。为了支持这样的功能,我不得不采取一些不被推荐的方法,在我们公司进行支持,直到框架编写者将其改进为止。 - Guido Medina

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接