被拒绝执行异常的原因可能是什么?

90

在我的Tomcat服务器(+ liferay)上出现了这个异常。

java.util.concurrent.RejectedExecutionException

我的类是这样的:

public class SingleExecutor extends ThreadPoolExecutor {
  public SingleExecutor(){
    super(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  }

  @Override
  public void execute(Runnable command) {
    if(command instanceof AccessLogInsert){
        AccessLogInsert ali = (AccessLogInsert)command;
        ali.setConn(conn);
        ali.setPs(ps);
    }
    super.execute(command);
  }
}

我在super.execute(command);这行代码中遇到了这个异常。当LinkedBlockingQueue的大小为2^31时,可能会出现此错误,但我确定没有那么多命令在等待。

一开始一切都很稳定,但是在重新部署war之后,它就开始出现了。这个类不是war的一部分,而是在tomcat/lib中的一个jar包中。

你有任何想法为什么会发生这种情况以及如何解决吗?

2个回答

99

来自 ThreadPoolExecutor JavaDoc(重点标注为我的翻译)

execute(java.lang.Runnable) 方法提交的新任务将在以下情况下被拒绝:Executor 已关闭时;当 Executor 对最大线程数和工作队列容量都设置了有限制的上限并且已饱和。在任一情况下,执行方法都会调用其 RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。提供了四个预定义的处理程序策略:

  1. 默认的 ThreadPoolExecutor.AbortPolicy 策略会在拒绝时抛出 RejectedExecutionException 异常。
  2. ThreadPoolExecutor.CallerRunsPolicy 策略使调用 execute 方法的线程执行该任务。这提供了一种简单的反馈控制机制,可以减缓提交新任务的速率。
  3. ThreadPoolExecutor.DiscardPolicy 策略会直接丢弃无法执行的任务。
  4. ThreadPoolExecutor.DiscardOldestPolicy 策略在执行器未关闭时将删除工作队列头部的任务,然后重试执行(这可能会再次失败,导致重复)。

可以定义并使用其他类型的 RejectedExecutionHandler 类。这样做需要特别小心,特别是当策略只适用于特定容量或排队策略时。

因此,重新加载 war 文件会触发 Executor 的关闭。尝试将相关库放入 war 文件中,这样 Tomcat 的 ClassLoader 就更有可能正确地重新加载您的应用程序。


2
答案的最后一部分很不错。 - Ravindra babu
1
当执行器已关闭时,使用execute(java.lang.Runnable)方法提交的新任务将被拒绝。这在我的代码中引起了一个bug,我通过在关闭后使线程休眠500毫秒(这可能不是必要的),然后将调度程序设置为null来解决它,以便下次需要运行任务时,相关方法会检查调度程序是否为null。如果是,则创建一个新的调度程序。因此,由于关闭而导致的拒绝被消除了。 - Agi Hammerthief
2
@AgiHammerthief,睡觉并不是必要的,但适当的并发控制是必须的。此外,虽然你的“解决方案”可能会防止崩溃,但它只是掩盖了一个听起来很大的资源所有权问题。 - OrangeDog
@OrangeDog讲得非常好。但是假设在经过适当的测试后,我的应用程序从未发生过这种情况。它只在生产环境中发生了一次。这可能是由于其他外部因素引起的,例如DB服务器宕机或Kafka服务器问题。增加最大池和队列容量是否是解决此问题的方法? - Dev

20

补充一下OrangeDog所说的,一个Executor的合同确实是这样的,即当执行器饱和(即队列中没有空间)时,它的execute方法将抛出RejectedExecutionException

然而,如果它可以阻塞,则会更有用,自动等待直到队列中有空间来处理新任务。

使用以下自定义BlockingQueue就可以实现这一点:

public final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {

    public ThreadPoolQueue(int capacity) {
        super(capacity);
    }

    @Override
    public boolean offer(Runnable e) {
        try {
            put(e);
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }

}

这基本上实现了反压算法,当执行器饱和时减缓生产者。

使用方法如下:

int n = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, n, 1, TimeUnit.MINUTES, new ThreadPoolQueue(n));
for (Runnable task : tasks) {
    executor.execute(task); // will never throw, nor will queue more than n tasks
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);

1
为了实现这一点,CallerRunsPolicy 已经存在,并且比简单阻塞具有更好的吞吐量。 - OrangeDog
1
不会,因为通过让生产者运行任务来阻塞它是一种性能降低的方式;这将导致池陷入饥饿状态,而生产者则忙于处理单个任务。 - rustyx
这次对话是这个页面最好的部分。rustyx说它是“pessimization”,他是正确的。考虑到手头的问题,我认为解决方案应该是上面两种之一。CallerRunsPolicy确实存在,并且使用起来很便宜。如果它真的导致池饥饿,那么最好实现阻塞。 - booFar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接