new SynchronousQueue()
new LinkedBlockingQueue(1)
这两者有何不同?我应该在什么情况下使用具有容量1的SynchronousQueue
而不是LinkedBlockingQueue
?
new SynchronousQueue()
new LinkedBlockingQueue(1)
这两者有何不同?我应该在什么情况下使用具有容量1的SynchronousQueue
而不是LinkedBlockingQueue
?
SynchronousQueue 更像是一次性交付,而 LinkedBlockingQueue 只允许一个元素。两者的区别在于对于 SynchronousQueue 的 put() 调用不会返回,直到有相应的 take() 调用;但对于大小为 1 的 LinkedBlockingQueue,put() 调用(对于空队列)将立即返回。
我不能说我曾经直接使用过 SynchronousQueue,但它是用于 Executors.newCachedThreadPool()
方法的默认 BlockingQueue。它基本上是当你不真正想要队列时的 BlockingQueue 实现(你不想维护任何待处理数据)。
offer()
,如果未被接受,则线程可以自己运行任务。[最后的1/4应由当前线程处理,如果您想知道为什么是1/4而不是1/3]
考虑试图将任务移交给工人,如果没有工人可用,您可以选择自己执行任务(或抛出异常)。相反,在LBQ中,将任务留在队列中不能保证任何执行。
注意:消费者和发布者的情况是相同的,即发布者可能会阻塞并等待消费者,但在offer
或poll
返回之后,它确保要处理该任务/元素。我认为SynchronousQueue
API文档表述得非常清楚:
- 一个阻塞队列,每个插入操作必须等待另一个线程的对应删除操作,反之亦然。
- 同步队列没有内部容量,甚至没有容量为1。您不能查看同步队列,因为只有在尝试删除元素时才存在元素; 除非另一个线程正在尝试删除它,否则您无法插入元素(使用任何方法); 没有东西可以迭代。
- 队列的头是第一个排队插入线程尝试添加到队列的元素;如果没有这样的排队线程,则没有元素可用于删除,
poll()
将返回null
。
- 一个队列,此外还支持在检索元素时等待队列变为非空,并在存储元素时等待队列中有空间。
所以差异是显而易见的,有点微妙但十分重要,特别是下面的第三点:
BlockingQueue
中检索时队列为空,操作将一直阻塞直到新元素被插入。同样地,如果你在向BlockingQueue
中插入元素时队列已满,操作将一直阻塞直到元素从队列中移除并为新队列腾出空间。然而请注意,在SynchronousQueue
中,操作会被阻塞直到另一个线程执行相反的操作(插入和删除是彼此相反的)。因此,与BlockingQueue
不同,阻塞取决于操作的存在,而不是元素的存在或不存在。peek()
方法总是返回null
(再次查看API文档),而iterator()
方法返回一个空的迭代器,其中hasNext()
方法始终返回false
。(API文档)。然而,请注意,poll()
方法会在另一个线程当前提供元素并且没有这样的线程存在时,整洁地检索和删除此队列的头部,并返回null
。(API文档)最后,一个小提示:无论是SynchronousQueue
还是LinkedBlockingQueue
类都实现了BlockingQueue
接口。
SynchronousQueue与其他队列的工作方式相似,但存在以下主要区别: 1)SynchronousQueue的大小为0 2)只有在take()方法能够立即从队列中获取元素时,put()方法才会插入元素。也就是说,如果消费者的take()调用需要一些时间来消费元素,则无法插入元素。
SynchronousQueue - 仅在某人立即接收它时才插入。
同步队列基本上用于交接目的。它们没有任何容量,并且put操作会被阻塞,直到其他线程执行get操作。
如果我们想要在两个线程之间安全地共享一个变量,我们可以将该变量放入同步队列中,并让另一个线程从队列中取出它。
https://www.baeldung.com/java-synchronous-queue的代码示例
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);
它们还被用于CachedThreadPool中,以实现无限(Integer.MAX)线程创建的效果,随着任务的到来。
CachedPool的coreSize为0,maxPoolSize为Integer.MAX,使用同步队列。
当任务到达队列时,其他任务会被阻塞,直到第一个任务被取出。由于它没有任何队列容量,线程池将创建一个线程,并且该线程将取出任务,从而允许更多的任务放入队列。这将继续进行,直到线程创建达到maxPoolSize。根据timeOut,空闲线程可能会被终止,并创建新的线程,而不会超过maxPoolSize。