我希望通过使用ExecutorService和ArrayBlockingQueue来了解我对生产者消费者设计的理解是否正确。我知道有不同的实现方法,但我猜最终这取决于问题本身。
我需要解决的问题是:我有一个读取大文件(6GB)的生产者;它逐行读取并将每行转换为对象,然后将对象放入ArrayBlockingQueue中。
消费者(少数)从ArrayBlockingQueue中获取对象,并将其保存到数据库中。
很明显,生产者比消费者快得多;将每行转换为对象只需要几分之一秒,但对于消费者来说需要更长时间。
因此,如果我想通过以下方式加速此过程:我创建了两个类'ProducerThread' 和 'ConsumerThread',它们共享ArrayBlockingQueue。协调它们的线程如下:
我需要解决的问题是:我有一个读取大文件(6GB)的生产者;它逐行读取并将每行转换为对象,然后将对象放入ArrayBlockingQueue中。
消费者(少数)从ArrayBlockingQueue中获取对象,并将其保存到数据库中。
很明显,生产者比消费者快得多;将每行转换为对象只需要几分之一秒,但对于消费者来说需要更长时间。
因此,如果我想通过以下方式加速此过程:我创建了两个类'ProducerThread' 和 'ConsumerThread',它们共享ArrayBlockingQueue。协调它们的线程如下:
@Override
public void run()
{
try{
ArrayBlockingQueue<Ticket> queue = new ArrayBlockingQueue<Ticket>(40);
ExecutorService threadPool = Executors.newFixedThreadPool(8);
threadPool.execute(new SaleConsumerThread("NEW YORK", queue));
threadPool.execute(new SaleConsumerThread("PARIS", queue));
threadPool.execute(new SaleConsumerThread("TEL AVIV", queue));
threadPool.execute(new SaleConsumerThread("HONG KONG", queue));
threadPool.execute(new SaleConsumerThread("LONDON", queue));
threadPool.execute(new SaleConsumerThread("BERLIN", queue));
threadPool.execute(new SaleConsumerThread("AMSTERDAM", queue));
Future producerStatus = threadPool.submit(new SaleProducerThread(progressBar, file, queue));
producerStatus.get();
threadPool.shutdown();
}catch(Exception exp)
{
exp.printStackTrace();
}
}
我的问题是:
上述设计是否会同时使用每个线程?我的计算机是双2.4GHz四核。
我不确定Future和.get()的作用是什么?
顺便说一下,结果非常快(考虑到第一个版本是顺序的,花了3小时),现在只需要大约40分钟(但也许还有提升的空间)。
感谢任何提示。