ThreadPoolExecutor policy

13

我尝试使用ThreadPoolExecutor来调度任务,但在其策略方面遇到了一些问题。以下是它的声明行为:

  1. 如果运行的线程少于corePoolSize,则执行程序始终倾向于添加新线程而不是排队。
  2. 如果有corePoolSize或更多的线程正在运行,则执行程序始终倾向于将请求排队而不是添加新线程。
  3. 如果无法将请求排队,则会创建一个新线程,除非这将超过maximumPoolSize,在这种情况下,任务将被拒绝。

我想要的行为是:

  1. 同上
  2. 如果运行的线程数在corePoolSize和maximumPoolSize之间,则优先添加新线程而不是排队,并且优先使用空闲线程而不是添加新线程。
  3. 同上

基本上,我不希望有任何任务被拒绝,我希望它们在一个无限制的队列中排队。但是我确实希望最多有maximumPoolSize个线程。如果我使用无界队列,当它达到coreSize时,它永远不会生成线程。如果我使用有界队列,则会拒绝任务。有没有什么解决方法?

现在我考虑的是在SynchronousQueue上运行ThreadPoolExecutor,但不直接向它提供任务-而是将它们提供给一个单独的无限制LinkedBlockingQueue。然后另一个线程从LinkedBlockingQueue中提取并将其提供给执行程序,如果被拒绝,则简单地重试,直到未被拒绝。虽然这似乎有点麻烦和有些hack,但是否有更清洁的方法?

4个回答

4

不需要像要求的那样微观管理线程池。

缓存线程池将重用空闲线程,同时允许潜在的无限并发线程。当然,在突发期间由于上下文切换开销可能导致性能失控。

Executors.newCachedThreadPool();

更好的选择是设置线程总数限制,而不再考虑使用空闲线程。配置更改如下:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

推理这种情况,如果执行器的线程数小于 corePoolSize ,那么它一定不会非常繁忙。如果系统不是非常繁忙,那么启动一个新线程就没有太大的危害。这将导致您的 ThreadPoolExecutor 总是在允许的最大工作线程数量下创建新的工作线程。只有当达到最大工作线程数时,等待任务的工作线程才会被分配任务。如果一个工作线程在没有任务的情况下等待了 aReasonableTimeDuration 时间,则可以终止。使用合理的池大小限制(毕竟,只有这么多个 CPU)和相当长的超时时间(以防止线程不必要地终止),可能会看到预期的好处。
最后一个选项很“hackish”。基本上,ThreadPoolExecutor 内部使用 BlockingQueue.offer 来确定队列是否有容量。自定义实现的 BlockingQueue 可以始终拒绝 offer 尝试。当 ThreadPoolExecutor 无法将任务 offer 到队列时,它将尝试创建一个新的工作线程。如果无法创建新的工作线程,将调用 RejectedExecutionHandler。此时,自定义的 RejectedExecutionHandler 可以强制将任务放入自定义的 BlockingQueue 中。
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}

1

只需设置 corePoolsize = maximumPoolSize 并使用无界队列?

在您的要点列表中,1排除2,因为 corePoolSize 总是小于或等于 maximumPoolSize

编辑

您想要的和 TPE 将提供给您的仍然存在不兼容之处。

如果您有一个无限制的队列,则 maximumPoolSize 被忽略,因此,正如您观察到的那样,最多只会创建并使用 corePoolSize 个线程。

因此,如果您使用带有无界队列的 corePoolsize = maximumPoolSize,您将得到所需的结果,对吧?


糟糕,我写的不是我想要的。我编辑了原始内容。 - Joe K
将corePoolsize设置为maximumPoolSize确实很接近,但我还使用了allowCoreThreadTimeOut(false)和prestartAllCoreThreads()。 - Joe K


1

你的使用情况很常见,完全合法,但比人们预期的更加困难。有关背景信息,您可以阅读此讨论并找到指向解决方案的指针(也在该线程中提到)这里。Shay的解决方案很好用。

通常我会对无限队列有点谨慎;最好具有明确的传入流量控制,以优雅地降级并调节当前/剩余工作的比率,以不超负荷生产者或消费者。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接