如何管理M个线程(每个任务1个)以确保同时只有N个线程。其中N < M。使用Java实现。

5

我有一个Java任务队列,这个队列在数据库表中。

我需要:

  • 每个任务仅使用1个线程
  • 同时运行的线程不超过N个。这是因为线程需要与数据库交互,我不希望打开大量的数据库连接。

我认为我可以这样做:

final Semaphore semaphore = new Semaphore(N);
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        final CountDownLatch cdl = new CountDownLatch(tasks.size());
        for (final JobTask task : tasks) {
            Thread tr = new Thread(new Runnable() {

                @Override
                public void run() {
                    semaphore.acquire();
                    task.doWork();
                    semaphore.release();
                    cdl.countDown();
                }

            });
        }
        cdl.await();
    }
}

我知道存在ExecutorService类,但不确定能否用它来完成此任务。

那么,你认为这是最好的解决方法吗?或者你能否解释一下ExecutorService如何工作以解决这个问题?

最终解决方案:

我认为最好的解决方案是:

while (isOnJob) {
    ExecutorService executor = Executors.newFixedThreadPool(N);
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        for (final JobTask task : tasks) {
            executor.submit(new Runnable() {

                @Override
                public void run() {
                    task.doWork();
                }

            });
        }
    }
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

非常感谢您的回答。顺便说一下,我正在使用连接池,但是与数据库的查询非常繁重,我不希望同时有不受控制的任务数量。

4个回答

7
您确实可以使用ExecutorService。例如,使用newFixedThreadPool方法创建一个新的固定线程池。这样,除了缓存线程之外,您还可以保证最多只有n个线程同时运行。
类似以下内容:
```java ExecutorService executor = Executors.newFixedThreadPool(n); ```
请注意,此处为示例代码,并非需要翻译内容的一部分。
private static final ExecutorService executor = Executors.newFixedThreadPool(N);
// ...
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        List<Future<?>> futures = new ArrayList<Future<?>>();
        for (final JobTask task : tasks) {
                Future<?> future = executor.submit(new Runnable() {    
                        @Override
                        public void run() {
                                task.doWork();
                        }
                });
                futures.add(future);
        }
        // you no longer need to use await
        for (Future<?> fut : futures) {
          fut.get();
        }
    }
}

请注意,您不再需要使用锁存器,因为如果需要,get将等待计算完成。

所以看起来我也不需要信号量,是吗? - user2427

4

我同意JG的观点,ExecutorService是正确的选择...但我认为你们两个都把它搞得比必要的还要复杂。

与其创建大量的线程(每个任务1个),为什么不只创建一个固定大小的线程池(使用Executors.newFixedThreadPool(N)),并将所有任务提交给它呢?不需要信号量或任何其他东西——只需在获取它们时将作业提交到线程池中,线程池将使用最多N个线程处理它们。

如果您一次不会使用超过N个线程,那么为什么要创建它们呢?


1

使用一个具有无限队列和固定线程最大数量的ThreadPoolExecutor实例,例如Executors.newFixedThreadPool(N)。这将接受大量任务,但只会同时执行N个任务。

如果您选择一个有界队列(容量为N),则Executor将拒绝执行任务(具体取决于您在直接使用ThreadPoolExecutor而不是使用Executors工厂时可以配置的策略 - 请参见RejectedExecutionHandler)。

如果您需要“真正”的拥塞控制,应该设置一个容量为N的绑定BlockingQueue。从数据库中获取要完成的任务并将它们put到队列中 - 如果队列已满,则调用线程将阻塞。在另一个线程中(也许也是使用Executor API启动的),您可以从BlockingQueuetake任务并将其提交给Executor。如果BlockingQueue为空,则调用线程也会阻塞。要发出完成信号,请使用“特殊”对象(例如标记队列中最后/最终项目的单例)。

0
获得良好的性能也取决于线程中需要完成的工作类型。如果您的数据库是处理的瓶颈,我建议您开始关注线程如何访问数据库。使用一个连接池可能会有所帮助。这可以帮助您实现更高的吞吐量,因为工作线程可以从池中重新使用数据库连接。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接