Java中的并发和阻塞队列

28

我有一个线程将事件推送到第二个线程的入站队列中,这是一个经典问题。但这次,我非常关心性能。我想要实现以下目标:

  • 我需要对队列进行并发访问,生产者推送,接收者弹出。
  • 当队列为空时,我希望消费者阻塞在队列上,等待生产者。

我的第一个想法是使用LinkedBlockingQueue,但我很快意识到它不是并发的,性能受到了影响。另一方面,我现在使用的是ConcurrentLinkedQueue,但仍然需要在每次发布时支付wait()/notify()的成本。由于消费者在发现队列为空时不会阻塞,所以我必须同步并在锁上wait()。另一方面,生产者必须获得该锁,并在每个单独的发布上notify()。总体而言,即使不需要,在每个单独的发布中都要支付sycnhronized (lock) {lock.notify()}的成本。

我猜想这里需要的是既可以阻塞又可以并发的队列。我想象中的push()操作工作方式与ConcurrentLinkedQueue相同,当推送的元素是列表中的第一个元素时,会额外发出notify()。我认为在ConcurrentLinkedQueue中已经存在这样的检查,因为推送需要连接下一个元素。因此,这比每次在外部锁上同步要快得多。

是否有类似这样的可用/合理的队列?


你为什么认为 java.util.concurrent.LinkedBlockingQueue 不是并发的?我认为它非常并发,因为看了它的 javadoc 和源代码。但关于性能方面我不确定。 - Rorick
请参见https://dev59.com/EHM_5IYBdhLWcg3wn0lO。 - Vadzim
6个回答

12

我认为你可以坚持使用java.util.concurrent.LinkedBlockingQueue,无论你有何疑虑。它是并发的。尽管如此,我不清楚它的性能如何。也许其他实现BlockingQueue的方法更适合你。这样的方法并不太多,所以进行性能测试并进行测量。


我只是观察到我的吞吐量与ConcurrentLinkedQueue相比减少了大约8倍。我猜测它在内部锁定整个东西以提供线程安全性。不过你说得对,它可能只是与ConcurrentLinkedQueue相比性能更差。这有点违背初衷啊;-) - Yiannis
1
无论你使用哪个队列,最终都需要用锁来支持等待新条目(除非你忙等待)。锁本身并不是很昂贵(锁大约需要0.5微秒),所以如果出现性能问题,可能是因为设计存在问题,例如创建较少的任务/找到一种方法添加较少的对象到队列/批量处理工作。 - Peter Lawrey
7
请注意,如果您希望获得更高的吞吐量,请在您的 LinkedBlockingQueue 上使用 drainTo() 方法。我们在其中一个应用程序上测试发现,与逐个 take() 元素相比,使用 drainTo() 方法可以将吞吐量提高近500%。请注意不改变原意,使内容更加通俗易懂。 - nos

6
类似于此答案https://dev59.com/j3M_5IYBdhLWcg3w02z8#1212515,但有些不同。我最终使用了ExecutorService。您可以通过使用Executors.newSingleThreadExecutor()来实例化它。我需要一个并发队列来读取/写入BufferedImages到文件,并具有读写的原子性。因为文件IO比源IO快几个数量级,所以我只需要一个线程。此外,我更关心操作的原子性和正确性而非性能,但是这种方法也可以用多个线程在池中完成以加速事情。
获取图像(省略Try-Catch-Finally):
Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
    @Override
        public BufferedImage call() throws Exception {
            ImageInputStream is = new FileImageInputStream(file);
            return  ImageIO.read(is);
        }
    })

image = futureImage.get();

保存图像(省略了Try-Catch-Finally):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
    @Override
    public Boolean call() {
        FileOutputStream os = new FileOutputStream(file); 
        return ImageIO.write(image, getFileFormat(), os);  
    }
});

boolean wasWritten = futureWrite.get();

重要提示:在finally块中刷新和关闭流。我不确定它与其他解决方案相比的性能如何,但它非常灵活。

5
我建议您查看ThreadPoolExecutor的newSingleThreadExecutor。它将为您处理任务顺序,并且如果您向执行程序提交Callables,您还将能够获得所需的阻塞行为。

4
你可以尝试使用jsr166中的LinkedTransferQueue:http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/ 它能够满足你的需求,并且对于offer/poll操作的开销较小。 从代码中我可以看出,当队列不为空时,它使用原子操作来轮询元素。当队列为空时,如果未成功,则会旋转一段时间并停止线程。 我认为它可以在你的情况下有所帮助。

3

每当我需要在不同的线程之间传递数据时,我会使用ArrayBlockingQueue。通过使用put和take方法(如果队列已满或为空则会阻塞),可以实现此目的。


2
这里是实现BlockingQueue的类的列表
就像@Rorick在评论中提到的那样,我相信所有这些实现都是并发的。我认为你对LinkedBlockingQueue的担忧可能是不必要的。 我建议查看SynchronousQueue

1
SynchronousQueue 不是我想要的,因为每次尝试发布时它都会阻止我的生产者。 - Yiannis
SynchronousQueue看起来更像是一个管道,而不是队列。它似乎不能包含待处理的任务,只能从生产者向消费者“推送”单个任务。 - Rorick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接