线程池多重限制

5
我需要一个线程池,最多提供X个线程来处理任务。但是,每个提交的任务可以指定一个特定的IO目标(比如说Y),并且受到限制。
例如,一个提交的IOTask指定了目标为"google.com",限制为4(Y),而线程池有一个全局限制为16(X)。我想要提交10个google.com-tasks,其中只有4个并行处理,线程池还有12个空闲线程可用于其他任务。
我该如何实现这一点?
6个回答

4
实现这个功能并不简单,因为你要么需要针对每个目标设置单独的队列(使等待代码变得更加复杂),要么从一个队列中跳过已满容量的目标(会增加性能开销)。你可以尝试扩展ExecutorService来实现此功能,但扩展看起来并不容易。
更新的答案/解决方案:
在更深入思考后,最简单的解决阻塞问题的方法是同时拥有阻塞队列(与正常情况下一样)以及队列映射(每个目标一个队列,以及每个目标可用线程数的计数)。映射队列仅用于已被放弃执行(由于该目标已经运行了太多线程)的任务,在从常规阻塞队列中获取任务后使用该队列。
因此,执行流程如下:
1.调用代码提交任务(带有特定目标)。 2.将任务放入阻塞队列中(很可能在这里包装自己的任务类,其中包含目标信息)。 3.线程(来自线程池)正在等待阻塞队列(通过take())。 4.线程获取提交的任务。 5.线程同步锁。 6.线程检查该目标的可用计数。 7.如果可用计数> 0, - 那么线程将计数减1,释放锁,运行任务。 - 否则,线程将任务放入目标到任务队列的映射中(此映射为“已忽略任务映射”),释放锁,并返回等待阻塞队列。 8.当线程执行完任务时: - 同步锁。 - 检查刚刚执行的目标计数。 - 如果计数== 0, - 然后检查是否存在该目标的已跳过任务映射中的任何任务,如果存在,则释放锁并运行它。 - 如果计数不为0或不存在相同目标的任务在已忽略的映射/队列上,则增加可用计数(对于该目标),释放锁,并返回等待阻塞队列。 这种解决方案避免了任何重大的性能开销或仅用于管理队列的单独线程。

2
您可以将两个ExecutorService实例封装在一个自定义类中,并手动管理任务提交,如下所示:
class ExecutorWrapper {

    private ExecutorService ioExec = Executors.newFixedThreadPool(4);
    private ExecutorService genExec = Executors.newFixedThreadPool(12);

    public Future<?> submit(final IOTask task) {
        return ioExec.submit(task);
    }

    public Future<?> submit(final Runnable task) {
        return genExec.submit(task);
    }
}

interface IOTask extends Runnable {}

这样可以使用4个线程来执行IO操作,留下其他12个线程来处理其他任务。


那基本上就是我所采取的方法。 - hotzen

2

思考一些更具体的答案。

  1. 您需要自己的BlockingQueue,它可以将不同类型的任务分开,并根据内部计数器返回所需的Runnable。

  2. 扩展ThreadPoolExecutor并实现beforeExecute和afterExecute。

当调用beforeExecute时,如果Runnable是类型X,则会在队列中增加一个计数器。 当调用afterExecute时,它将减少该计数器。

在您的队列中,根据计数器的值返回适当的Runnable,我认为take方法是您这样做的地方。

这里有一些同步问题必须完全考虑,以确保计数器永远不会超过4。 不幸的是,一旦进入beforeExecute,就太晚了,但是能够简单地知道何时运行多少个任务可能会让您开始。


1
嗯...恐怕现有的ExecutorService不允许如此细粒度的控制。你可能需要扩展ExecutorService类来自己添加这个功能,或者使用两个独立的固定线程池,一个容量为4,另一个容量为12。

1
一个想法是扩展ExecutorService,在你的类中有两个线程池,一个容量为4,另一个容量为12。
然后实现你需要的方法,并根据提交的IOTasks将任务分配到你想要的池中。

1
使用计数器来统计线程总数和哈希映射,该映射计数当前正在尝试访问X网站的线程数量。当您想要启动新线程时,请调用一个同步方法,该方法检查等待(在while循环中使用wait())直到哈希映射中的线程数少于4且总线程数少于16。然后增加两个计数器并启动线程。当线程完成时,它应调用第二个同步方法,该方法减少计数器并调用notify()。

但这意味着,试图启动新线程的操作本身被wait/notify阻塞,直到线程可用。我想尽量减少使用的线程数。 - hotzen
是的,你说得对。这是一个不好的回答。如果我可以的话,我会给它点个踩。 - dspyz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接