Java - ExecutorService有最大大小限制

3
有没有办法在巨大的数据库中并行地应用一些作业?我尝试使用ExecutorService,但我们必须shutdown()才能知道池的大小...
所以我的最佳解决方案是:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 100;
    int MYTHREADS = 10;
    int loopIndex = 0;
    boolean bContinue=true;
    Runnable worker;



    while(bContinue) // in this loop we'll constantly fill the pool list
    {
        loopIndex++;
        ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

        List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
        for(String id: ids) {
            worker = new MyRunnable(id);
            executor.execute(worker);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                    " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
            );
            TimeUnit.MILLISECONDS.sleep(500);
        }

        if(loopIndex>=3) {
            System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
            bContinue = false;
        }
        dbOffset+=nbOfArticlesPerRequest;
    }
}



public static class MyRunnable implements Runnable {

    private final String id;

    MyRunnable(String id) {
        this.id = id;
    }

        @Override
        public void run()
        {
            System.out.println("Thread '"+id+"' started");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread '"+id+"' stopped");
        }
    }
}

这个工作很顺利,但缺点是在每个循环结束时我需要等待最后一个线程完成。
例如:当只有3个线程正在运行时...
为了解决这个问题,我做了以下操作,但这样做是否“安全”/正确?
顺便问一下:有没有办法知道队列中有多少任务/线程?
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executor.execute(worker);
             }

            while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
                System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
                );
                TimeUnit.MILLISECONDS.sleep(500);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executor.shutdown();
    // Wait until all threads are finish
    while (!executor.isTerminated()) {
        System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

编辑:

我尝试启动1个或100万个任务,因此(我认为)不能将它们全部放入队列中...这就是为什么我使用全局执行器,以便始终可以在队列中拥有一些线程(为此,我不能关闭执行器,否则它将无法使用)。

最新代码版本:

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executorPool.execute(worker);
             }

            while (executorPool.getActiveCount() >= MYTHREADS  || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
            {
                System.out.println("Pool size is now " + executorPool.getActiveCount()+
                                        " - queue size: "+ executorPool.getQueue().size()
                );

                if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
                    System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
                    break;
                }

                TimeUnit.MILLISECONDS.sleep(2000);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executorPool.shutdown();
    // Wait until all threads are finish
    while (!executorPool.isTerminated()) {
        System.out.println("Pool size is now " + executorPool.getActiveCount()+
                " - queue size: "+ executorPool.getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

感谢您的提前帮助。

你可以使用invokeAll()来等待线程完成。请参考:https://dev59.com/KnA75IYBdhLWcg3wfpGr#36699136 - Ravindra babu
3个回答

9

更新

现在我清楚了您的主要关注点是无法一次提交1000万个任务。

不用担心,您可以将它们全部提交到执行器中。实际并行运行的任务数量受底层线程池大小的限制。也就是说,如果您有一个大小为2的线程池,那么每次只有两个任务被执行,其余任务会在队列中等待空闲线程。

默认情况下,Executors.newFixedThreadPool() 创建具有 Integer.MAX_VALUE 大小队列的 Executor,因此您的数百万个任务都可以放在那里。


您可以使用 ExecutorService.submit() 方法返回 Future。然后您可以检查 Future 任务的状态(即使用 isDone()isCancelled() 方法)。

Executor 通常是您不想显式关闭的东西,并且在整个应用程序生命周期内存在。通过这种方法,您无需关闭即可知道有多少任务在等待执行。

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

此外,请注意任务(tasks)和线程(threads)不是可以互换的术语。在您的情况下,执行程序有固定数量的线程。您可以提交更多的任务,但剩余的任务将会等待直到有空闲的线程来运行它们。请留意,此处保留了HTML标签。

可能是个好主意...然后我只需要添加一个"等待循环",以便在运行少于X个任务时分配更多任务... - Bast
和@Pavan一样的问题,与我的解决方案相比,您的解决方案有什么好处(请参见EDIT后的最新代码)? - Bast
好的,现在我明白了你的意思并更新了答案。我认为你正在付出很大的努力来模拟一些执行器已经提供的东西——队列。 - David Siro
谢谢你,David。但是,如果提交了1000万个任务,执行器对象不会很大吗(即:需要大量的RAM)? - Bast
你为什么不亲自试一下呢?我刚刚快速运行了1000万个System.out任务,消耗的内存量是800MB... - David Siro
1
当然了,你的更新向我展示了队列可以管理如此大量的任务!谢谢! - Bast

0

ExecuterService允许您调用可以并行运行的任务列表,并在结果可用时返回结果。

在您的代码中,您正在使用

worker = new MyRunnable(id);
executor.execute(worker);

不要使用Runnable,在这种情况下最好使用Callable,然后您可以将Callable列表提交给单个API以执行,而无需使用for循环。

List<Callable> workers = new ArrayList<>();
workers.add(new MyCallable(id)) // this is just for example
workers.add(new MyCallable(id))
workers.add(new MyCallable(id))

List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value.

请注意,Future 对象上的 get 方法是阻塞调用。

invokeAll也是阻塞的,所以最初的问题(需要在每个循环中等待最后一个线程)并没有得到解决。 :) - Bast
@Bast - 根据我的理解,invokeAll不是阻塞调用。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection) - Pavan
你说得对,只有 future.get() 是阻塞的... 我可能可以使用你的解决方案,就像 @DavidSiro 提出的那个一样... - Bast
请尝试留下您的反馈,这可能会帮助其他人。谢谢。 - Pavan
与我的解决方案相比,您的解决方案有何优势(请参阅EDIT后的最新代码)? - Bast
显示剩余2条评论

0

ExecutorService中检查任务完成时,您无需知道线程池的大小。在提交任务后,可以删除您的代码。

选项1:

  1. 将ThreadPoolExecutor替换为Executors中的newWorkStealingPool

    使用所有可用处理器作为目标并行级别创建工作窃取线程池。

    这将允许更好地利用ExecutorService中的线程。

    ExecutorService executor = Executors.newWorkStealingPool();
    
  2. 使用invokeAll

选项2:(如果您提前知道任务数量很有用)

使用CountDownLatch并将计数器初始化为要提交的任务数。

更多参考资料:

在Java中等待所有线程完成工作

如何正确关闭Java ExecutorService


是的,但因为它是一个while循环,我想动态添加新线程,以便始终有一些处于“队列”中... 实际上使用getActiveCount()更正确(代码已更新)——我现在甚至在本地代码中切换到了ThreadPoolExecutor。 - Bast
请注意,在我的第二段代码(即:“solution”)中,ExecutorService是全局的,因此如果不关闭它,它将无法再使用。 - Bast
在 while 循环之外,您可以按照上述帖子中引用的顺序使用 shutdown、shutdownNow 和 awaitTermination API 来保留关闭代码。 - Ravindra babu
是的,shutdown()已经在我的代码循环外面了...问题是我需要启动1或10000000个任务,所以(我猜)我不能把它们全部放入队列中...我认为使用全局执行器可能是一种解决方法,以便始终有一些线程在队列中...我将编辑我的帖子并添加我正在运行的最新代码。 - Bast
使用invokeAll()并将ThreadPoolExecutor替换为newWorkStealingPool - Ravindra babu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接