生产者消费者 - 使用Executors.newFixedThreadPool

7
我对生产者-消费者模式的理解是,可以使用一个队列来实现生产者和消费者之间的共享。生产者将工作提交到共享队列中,消费者获取并处理它。也可以通过生产者直接向消费者提交来实现(生产者线程直接向消费者的执行器服务提交)。
现在,我一直在研究Executors类,它提供了一些常见的线程池实现。据规范介绍,newFixedThreadPool方法“重用一组固定数量的线程,并且这些线程以无界队列为基础运行”。这里所说的队列是哪个?
如果生产者直接向消费者提交任务,那么它是否是ExecutorService的内部队列,其中包含了可运行的任务列表?
或者,如果生产者提交给共享队列,那么它是否是中间队列?
也许我错过了整个重点,但请有人澄清一下吗?
3个回答

4
您是正确的,ExecutorService不仅是一个线程池,而且它是完整的生产者-消费者实现。这个内部队列实际上是一个线程安全的Runnable队列(准确来说是FutureTask),保存了您submit()的任务。
池中的所有线程都被阻塞在那个队列上,等待执行任务。当您submit()一个任务时,恰好有一个线程会拿起它并运行它。当然,submit()不会等待池中的线程完成处理。
另一方面,如果您提交大量任务(或长时间运行的任务),您可能会发现池中的所有线程都在占用,队列中有一些任务在等待。一旦任何线程完成其任务,它将立即从队列中获取第一个任务。

只是为了澄清:ExecutorService 只是一个接口。你可以使用一个类来实现 ExecutorService,该类会在提交时立即在同一线程中运行每个可运行对象(我相信在 java.util.concurrent 包中有这样的实现)。但是,在实践中,大多数 ExecutorService 实现都是完整的生产者-消费者实现。 - Daniel Pryden
你是完全正确的,我所指的“ExecutorService”是指“由Executors.newFixedThreadPool()返回,并实现了ExecutorService接口的那个东西”。感谢澄清。 - Tomasz Nurkiewicz
1
谢谢大家。如果我使用newFixedThreadPool(8)创建一个执行器服务,然后在其上执行约1000个可运行任务,请确认我的情况理解:1.最多会创建8个线程 2.在处理开始时,当8个线程忙碌时,992个任务将被保留在内部队列中 3.此外,因为它是无界队列,所以对于提交到执行器服务的任务数量没有上限。如果我使用有界队列创建ExecutorService,以上情况会产生什么影响?它会表现得更好吗?谢谢,O。 - Oxford
1
@Oxford:你说得对。关于有界队列:如果队列有一个限制(比如100个条目),那么一旦达到100个条目,任何尝试submit()新的Runnable都会被阻塞,直到至少有一个任务完成并且队列中再次有空间。这对于限制生产者线程的速率非常有用。在某些情况下,您不希望生产者线程永远被阻塞,在这种情况下,您可能需要一个无界队列。 - Daniel Pryden
@Daniel,submit()方法不会抛出InterruptedException,因此它不会阻塞消费者。然而,如果队列已满,任务将被委托给RejectedExecutionHandler处理。 - GauravJ

1
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  

0
看这个:检查一下:
在Java中的生产者-消费者示例(RabbitMQ)(它是为另一个库编写的,但是它是用Java编写的,并且清楚地演示了概念;)
希望能帮到你!

P.S.:实际上,它有几个示例,但你可以理解的;)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接