ThreadPoolExecutor和队列

7

我认为使用ThreadPoolExecutor,我们可以将Runnable提交到构造函数中传递的BlockingQueue中或使用execute方法执行。
我理解的是,如果有任务可用,它将被执行。
我不理解的是以下内容:

public class MyThreadPoolExecutor {  

    private static ThreadPoolExecutor executor;  

    public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){  
        executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue);   
        //executor.prestartAllCoreThreads();  
    }  

    public static void main(String[] main){
        BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
        final String[] names = {"A","B","C","D","E","F"};  
        for(int i = 0; i < names.length; i++){  
            final int j = i;  
            q.add(new Runnable() {  

                @Override  
                public void run() {  
                    System.out.println("Hi "+ names[j]);  

                }  
            });         
        }  
        new MyThreadPoolExecutor(10, 20, 1, q);   
        try {  
            TimeUnit.SECONDS.sleep(5);  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        /*executor.execute(new Runnable() {  

            @Override  
            public void run() {  

                System.out.println("++++++++++++++");  

            }   
        });  */
        for(int i = 0; i < 100; i++){  
            final int j = i;  
            q.add(new Runnable() {   

                @Override  
                public void run() {  
                    System.out.println("Hi "+ j);  

                }  
            });  
        }   


    }  


}

这段代码除非我在构造函数中取消注释executor.prestartAllCoreThreads();,或者调用打印System.out.println("++++++++++++++");的可运行对象的execute(也被注释掉),否则不会有任何作用。
为什么呢?
引用(我强调的):
默认情况下,即使核心线程也仅在到达新任务时才被创建和启动,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()来动态覆盖此行为。如果您使用一个非空队列构造池,则可能需要预启动线程。
好吧。我的队列不是空的。但是我创建了executor,我执行了sleep,然后将新的Runnable添加到队列中(在循环中到100)。
这个循环不算作“到达新任务”吗?为什么它不能起作用?我必须要么prestart,要么显式地调用execute吗?

1
当任务通过execute到达时,工作线程将被生成,并且这些线程是与底层工作队列交互的线程。如果您从非空工作队列开始,则需要预先启动工作线程。请参阅OpenJDK 7中的实现 - obataku
@veer:是的,我知道。但我是从非空队列开始,然后再开始向队列中添加元素。为什么我需要预启动工作线程?为什么每次都要查看实现细节?是规范有误还是我的理解有误?那么哪一种情况呢? - Cratylus
工作者是直接与队列交互的人。只有在通过execute(或其上层,例如invokeAllsubmit等)传递时才会按需生成它们。请参见此处的execute代码(http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7-b147/java/util/concurrent/ThreadPoolExecutor.java#1300)。 - obataku
@veer:当我预启动工作进程时,代码可以正常运行而无需调用“execute”。 - Cratylus
2
是的,那是因为工人已经开始工作了,因此他们能够检查队列中是否有可用的工作。 - obataku
我曾认为使用ThreadPoolExecutor可以将Runnable提交到构造函数中传递的BlockingQueue中执行,或者使用execute方法执行。但这是错误的,创建线程池后,您应该只使用execute方法提交新任务。 - augurar
2个回答

11

当通过execute方法到达任务时,工作线程将被生成,这些线程与底层工作队列进行交互。如果您从非空的工作队列开始,则需要预先启动工作线程。请参见OpenJDK 7中的实现

我重申一遍,工作线程是与工作队列进行交互的线程。只有在通过execute传递时(或其上面的层,例如invokeAll、submit等)才会按需生成它们。如果它们没有启动,那么无论您向队列中添加多少工作,都不会有任何检查,因为没有启动任何工作线程

ThreadPoolExecutor直到必要时或者通过方法prestartAllCoreThreadsprestartCoreThread预先创建工作线程。如果没有启动任何工作线程,那么您队列中的任何工作都无法完成。

添加初始的execute方法的原因是它强制创建一个唯一的核心工作线程,然后可以开始处理您队列中的工作。您还可以调用prestartCoreThread并获得类似的行为。如果要启动所有工作线程,则必须调用prestartAllCoreThreads或通过execute方法提交相应数量的任务。

请参见下面的execute代码。

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

你没有回答关于规范的问题,而是通过提供JVM实现的细节来回答(可能对于Oracle或IBM等公司有所不同)。 - Cratylus
工作线程会在任务到达时被生成并执行。如果工作线程已经启动,我可以直接将任务添加到队列中,而不需要调用“execute”。 - Cratylus
我非常明确地放置了一个实现的实现细节(实际上与Oracle JDK中的相同)。 OP是代码按照其方式工作的原因,我已经解释了。 ThreadPoolExecutor直到必要时或通过方法[prestartAllCoreThreads](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#prestartAllCoreThreads())和[prestartCoreThread](http:// docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#prestartCoreThread())预先启动核心线程才会生成工作线程。 - obataku
如果你在调用execute后将工作添加到队列中,并且它可以正常工作,那么你已经成功地创建了一个“单一”的工作线程,它独立于处理工作队列。 - obataku
2
只要有工作线程等待消费任务,你可以通过队列进行交互,这也是为什么文档建议你提前启动它们以处理非空的工作队列。通过调用 execute 一次来“准备”执行器只会启动一个工作线程,而不是整个线程池。 - obataku
显示剩余2条评论

5

BlockingQueue不是一个神奇的线程调度程序。如果您将Runnable对象提交到队列中,而没有运行的线程来消耗这些任务,它们当然不会被执行。另一方面,如果需要,execute方法将根据线程池配置自动分派线程。如果预先启动了所有核心线程,则会有线程从队列中消耗任务。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接