生产者消费者 - ExecutorService 和 ArrayBlockingQueue

3
我希望通过使用ExecutorService和ArrayBlockingQueue来了解我对生产者消费者设计的理解是否正确。我知道有不同的实现方法,但我猜最终这取决于问题本身。
我需要解决的问题是:我有一个读取大文件(6GB)的生产者;它逐行读取并将每行转换为对象,然后将对象放入ArrayBlockingQueue中。
消费者(少数)从ArrayBlockingQueue中获取对象,并将其保存到数据库中。
很明显,生产者比消费者快得多;将每行转换为对象只需要几分之一秒,但对于消费者来说需要更长时间。
因此,如果我想通过以下方式加速此过程:我创建了两个类'ProducerThread' 和 'ConsumerThread',它们共享ArrayBlockingQueue。协调它们的线程如下:
@Override
public void run()
{
    try{

        ArrayBlockingQueue<Ticket> queue = new ArrayBlockingQueue<Ticket>(40);
        ExecutorService threadPool = Executors.newFixedThreadPool(8);

        threadPool.execute(new SaleConsumerThread("NEW YORK", queue)); 
        threadPool.execute(new SaleConsumerThread("PARIS", queue));
        threadPool.execute(new SaleConsumerThread("TEL AVIV", queue));
        threadPool.execute(new SaleConsumerThread("HONG KONG", queue));
        threadPool.execute(new SaleConsumerThread("LONDON", queue));
        threadPool.execute(new SaleConsumerThread("BERLIN", queue));
        threadPool.execute(new SaleConsumerThread("AMSTERDAM", queue));

        Future producerStatus = threadPool.submit(new SaleProducerThread(progressBar, file, queue)); 
        producerStatus.get(); 
        threadPool.shutdown();   

    }catch(Exception exp)
    {
        exp.printStackTrace();
    }
}

我的问题是:

  1. 上述设计是否会同时使用每个线程?我的计算机是双2.4GHz四核。

  2. 我不确定Future和.get()的作用是什么?

顺便说一下,结果非常快(考虑到第一个版本是顺序的,花了3小时),现在只需要大约40分钟(但也许还有提升的空间)。

感谢任何提示。

2个回答

2

我建议您查看等待IO的时间和CPU使用率的情况。我怀疑您的主要瓶颈在于数据库,您需要考虑如何使导入更加高效。您可以尝试批量更新,因为这可以提高吞吐量。


1
+1 信号线程添加一行文本到数据库并不会提高效率。如果只使用一个线程,性能不会变得更糟,我会感到惊讶。批量处理行和/或BULK INSERT和/或存储过程将是更好的优化方法。 - Martin James

1

回答:

  1. 我不确定您所说的“同时使用每个线程”的意思。但是所有线程肯定可以并发执行。您的性能将取决于您有多少线程以及数据如何分区。您可以尝试使用更多线程来获得更好的结果,而不是按城市分配线程,也许您可以使用记录编号,只需将每个线程分配给记录编号的模数即可。假设您有10个线程,记录1、11、21等将进入线程1,2、22等将进入线程2。这样,您将获得相同数量的事务每个线程,因此直到完成为止,您将充分利用线程。
  2. Future是为了在事件完成时允许代码阻塞。在这种情况下,get方法返回SaleProducerThread的结果。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接