如何使用ConcurrentLinkedQueue?

103
我该如何在Java中使用ConcurrentLinkedQueue
使用这个LinkedQueue,我需要担心队列中的并发问题吗?还是我只需要定义两个方法(一个用于从列表中检索元素,另一个用于向列表中添加元素)?
注意:显然,这两个方法必须进行同步。对吗?

编辑:我想要做的是这样的:我有一个类(在Java中),其中一个方法用于从队列中检索项目,另一个类具有一个方法用于向队列中添加项目。添加和检索列表中的项目是我的自定义类的对象。

还有一个问题:我需要在remove方法中这样做吗:

while (queue.size() == 0){ 
  wait(); 
  queue.poll();
}

我只有一个消费者和一个生产者。


感谢您对我的问题的回复。 我想要做的是:我有一个类(用Java编写),其中一个方法用于从队列中检索项目,另一个类具有一个方法用于向队列中添加项目。添加到列表中的项目和检索的项目都是我自己的类的对象。 - Ricardo Felgueiras
2
您应该编辑您的问题,并在问题本身中加入这个澄清。 - Adam Jaskiewicz
6个回答

165
不需要同步方法,也不需要定义任何方法;它们已经在ConcurrentLinkedQueue中了,只需使用即可。 ConcurrentLinkedQueue会在内部执行所有锁定和其他操作;您的生产者将数据添加到队列中,您的消费者则会轮询它。
首先,创建您的队列:
Queue<YourObject> queue = new ConcurrentLinkedQueue<YourObject>();

现在,无论您在哪里创建生产者/消费者对象,请传递队列,以便它们有地方放置其对象(您也可以使用setter进行此操作,但我更喜欢在构造函数中执行此类操作):
YourProducer producer = new YourProducer(queue);

并且:

YourConsumer consumer = new YourConsumer(queue);

并在生产者中添加内容:

queue.offer(myObject);

从您的消费者中取出内容(如果队列为空,则poll()将返回null,请检查它):

YourObject myObject = queue.poll();

如需更多信息,请参见Javadoc

编辑:

如果您需要阻塞等待队列不为空,您可能需要使用LinkedBlockingQueue并使用take()方法。但是,LinkedBlockingQueue具有最大容量(默认为Integer.MAX_VALUE,超过20亿),因此根据您的情况可能或可能不适用。

如果您只有一个线程将东西放入队列,另一个线程从队列中取出东西,那么ConcurrentLinkedQueue可能过于复杂了。它更适用于可能有数百甚至数千个线程同时访问队列的情况。您的需求可能通过使用以下内容来满足:

Queue<YourObject> queue = Collections.synchronizedList(new LinkedList<YourObject>());

这样做的一个优点是它锁定了实例(队列),因此您可以在队列上同步以确保组合操作的原子性(如Jared所解释的)。使用ConcurrentLinkedQueue时,您无法这样做,因为所有操作都是在不锁定实例的情况下完成的(使用java.util.concurrent.atomic变量)。如果您想在队列为空时阻塞,您将不需要执行此操作,因为poll()在队列为空时只会返回null,并且poll()是原子的。检查是否返回null。如果是,则等待(),然后再试一次。不需要锁定。
最后:
老实说,我只会使用LinkedBlockingQueue。虽然对于您的应用程序来说还过剩,但很可能会正常工作。如果它的性能不够好(进行性能分析!),您可以尝试其他东西,这意味着您不必处理任何同步问题:
BlockingQueue<YourObject> queue = new LinkedBlockingQueue<YourObject>();

queue.put(myObject); // Blocks until queue isn't full.

YourObject myObject = queue.take(); // Blocks until queue isn't empty.

其他的都一样。放心使用,可能不会阻塞,因为你不太可能将20亿个对象放入队列中。


谢谢您的回复。 还有一个问题:我需要在remove方法中这样做吗: while (queue.size() == 0) wait(); queue.poll(); - Ricardo Felgueiras
我会在我的回答中进行编辑来回答这个问题,因为它非常重要。 - Adam Jaskiewicz
由于另一个问题引起了混淆,Collection.synchronizedList返回的是一个未实现Queue接口的List - Tom Hawtin - tackline
@AdamJaskiewicz使用ConcurrentLinkedQueue作为生产者消费者的好主意吗?我参考了这篇文章https://dev59.com/9XM_5IYBdhLWcg3wUxZB?noredirect=1&lq=1 - rd22

39

这大部分是另一个问题的重复

以下是与此问题相关的答案部分:

如果我使用java.util.ConcurrentLinkedQueue,我需要进行自己的同步吗?

并发集合上的原子操作已经为您进行了同步。换句话说,队列的每个单独调用都无需您采取任何措施即可保证线程安全。 非原子性的集合操作不是线程安全的。

例如,这是线程安全的,而无需您采取任何措施:

queue.add(obj);
或者
queue.poll(obj);

然而,对队列的非原子调用不会自动地保证线程安全。例如,以下操作不会自动保证线程安全:

if(!queue.isEmpty()) {
   queue.poll(obj);
}

最后一个示例不是线程安全的,因为在调用isEmpty和调用poll之间,其他线程有可能向队列中添加或删除项目。这样执行的线程安全方式如下:

synchronized(queue) {
    if(!queue.isEmpty()) {
       queue.poll(obj);
    }
}

再次强调,对于队列的原子操作是自动线程安全的。非原子操作则不是。


1
我能想到的唯一常见用途是阻塞,直到队列非空。使用BlockingQueue的实现(take()是原子操作,并阻塞直到有可消耗的内容)不是更好吗? - Adam Jaskiewicz
6
请注意,与synchronizedList不同,ConcurrentLinkedQueue不会在自身上进行同步,因此在您的代码中,当您处于同步块时,生产者仍然可以向队列提供数据。请谨慎处理。 - Adam Jaskiewicz
为什么要将queue.isEmpty()和queue.poll()结合起来使用?难道你不能只是调用poll()方法并检查结果是否为空吗?(我的理解是,如果你从一个空的队列中调用poll()方法,它会返回null) - charles-allen
1
除了最后一个代码块,我同意你的所有代码。在ConcurrentLinkedQueue上进行同步并不能保证什么,因为其他调用没有被同步。所以,在您完成线程同步后,在“isEmpty()”和“poll(...)”之间可能会出现来自另一个线程的“add(..)”操作。 - Klitos G.

8

以下代码可能是你在处理队列时所寻找的线程安全和"美观"解决方案:

for (YourObject obj = queue.poll(); obj != null; obj = queue.poll()) {
}

这将确保您在队列为空时停止,只要队列不为空,就可以继续弹出其中的对象。

1
非常有用的清空队列的方法。谢谢。 - DevilCode

6

使用poll方法获取第一个元素,使用add方法添加新的最后一个元素。这就是全部内容,不需要进行同步或其他操作。


2

ConcurrentLinkedQueue是一个非常高效的无等待/无锁实现(参见javadoc),因此您不仅不需要同步,而且队列不会锁定任何内容,因此它几乎与非同步(非线程安全)的队列一样快。


1

只需要像使用非并发集合一样使用它。Concurrent [Collection] 类将常规集合包装起来,这样您就不必考虑同步访问。

编辑:ConcurrentLinkedList实际上不仅仅是一个包装器,而是更好的并发实现。无论哪种方式,您都不必担心同步问题。


ConcurrentLinkedQueue并不是简单的包装器,它是从头开始专门为多个生产者和消费者的并发访问而构建的。比Collections.synchronized*返回的简单包装器要高级一些。 - Adam Jaskiewicz
是的,在J2SE 5中添加了许多不错的并发性能。 - Adam Jaskiewicz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接