何时应该使用SynchronousQueue而不是LinkedBlockingQueue?

65
new SynchronousQueue()
new LinkedBlockingQueue(1)

这两者有何不同?我应该在什么情况下使用具有容量1的SynchronousQueue而不是LinkedBlockingQueue

6个回答

65

SynchronousQueue 更像是一次性交付,而 LinkedBlockingQueue 只允许一个元素。两者的区别在于对于 SynchronousQueue 的 put() 调用不会返回,直到有相应的 take() 调用;但对于大小为 1 的 LinkedBlockingQueue,put() 调用(对于空队列)将立即返回。

我不能说我曾经直接使用过 SynchronousQueue,但它是用于 Executors.newCachedThreadPool() 方法的默认 BlockingQueue。它基本上是当你不真正想要队列时的 BlockingQueue 实现(你不想维护任何待处理数据)。


13
当多个线程生产对象并将其放入队列的速度快于消费者消费和处理它们时,队列可能会变得过大。SynchronousQueue可以帮助控制通信,而不需要在生产者中编写任何特定的代码。在现实生活中,这类似于一个会议,在会议上一个人回答其他人提出的问题。可以将SynchronousQueue看作是一种秘书。 - andrey
1
我使用SynchronousQueue的情况是在“流水线”场景中。假设您有一个处理阶段的管道,其中一些数据块从“生产者”开始沿着管道传递,最终到达“消费者”。假设所有阶段都有点确定性,那么实际队列就过度了。你只需要在阶段之间进行交接。如果数据块很大,则这一点非常重要,因为您不希望创建太多数据块。这类似于旧的“双缓冲”策略。 - Wheezil
1
一个非常具体的例子是数据库加载器。假设您想要扫描一个分隔符文本文件并将其加载到数据库中。您有两个阶段——一个阶段扫描文件并生成“记录”块以进行插入(记录块可以是对象的二维数组),另一个阶段调用JDBC进行记录插入,每个块都有自己的事务/批处理。这些东西非常好地重叠在一起。 - Wheezil
@Wheezil - 听起来你描述的处理过程基本上是单线程的。那么队列和多个线程的意义是什么呢? - jtahlborn
2
如果将一个本来是单线程的操作分为多个阶段,并使用工作项在各个阶段之间进行流水线处理,每个阶段都在自己的线程中运行,那么就可以通过重叠获得并发性。在这种情况下,“扫描CSV”和“插入记录”是两个阶段,它们可以并发执行以使数据库插入达到饱和状态。 - Wheezil
显示剩余3条评论

10
据我理解,以上代码执行的是相同的操作。 不,这些代码完全不相同。 Sync.Q.需要有等待者才能成功提供服务。即使没有等待者,LBQ也会保留该项并且立即完成提供服务。 SyncQ对于任务移交非常有用。想象一下,您有一个挂起任务列表和3个可用线程在队列中等待,如果使用1/4的列表尝试offer(),如果未被接受,则线程可以自己运行任务。[最后的1/4应由当前线程处理,如果您想知道为什么是1/4而不是1/3] 考虑试图将任务移交给工人,如果没有工人可用,您可以选择自己执行任务(或抛出异常)。相反,在LBQ中,将任务留在队列中不能保证任何执行。 注意:消费者和发布者的情况是相同的,即发布者可能会阻塞并等待消费者,但在offerpoll返回之后,它确保要处理该任务/元素。

8

5

我认为SynchronousQueue API文档表述得非常清楚:

  1. 一个阻塞队列,每个插入操作必须等待另一个线程的对应删除操作,反之亦然。
  2. 同步队列没有内部容量,甚至没有容量为1。您不能查看同步队列,因为只有在尝试删除元素时才存在元素; 除非另一个线程正在尝试删除它,否则您无法插入元素(使用任何方法); 没有东西可以迭代。
  3. 队列的头是第一个排队插入线程尝试添加到队列的元素;如果没有这样的排队线程,则没有元素可用于删除,poll()将返回null

以及BlockingQueue API文档:

  1. 一个队列,此外还支持在检索元素时等待队列变为非空,并在存储元素时等待队列中有空间。

所以差异是显而易见的,有点微妙但十分重要,特别是下面的第三点:

  1. 如果你从BlockingQueue中检索时队列为空,操作将一直阻塞直到新元素被插入。同样地,如果你在向BlockingQueue中插入元素时队列已满,操作将一直阻塞直到元素从队列中移除并为新队列腾出空间。然而请注意,在SynchronousQueue中,操作会被阻塞直到另一个线程执行相反的操作(插入和删除是彼此相反的)。因此,与BlockingQueue不同,阻塞取决于操作的存在,而不是元素的存在或不存在。
  2. 由于阻塞取决于相反操作的存在,元素实际上永远不会真正插入队列中。这就是为什么第二点说:“同步队列没有任何内部容量,甚至没有一个容量。”
  3. 因此,peek()方法总是返回null(再次查看API文档),而iterator()方法返回一个空的迭代器,其中hasNext()方法始终返回false。(API文档)。然而,请注意,poll()方法会在另一个线程当前提供元素并且没有这样的线程存在时,整洁地检索和删除此队列的头部,并返回null。(API文档

最后,一个小提示:无论是SynchronousQueue还是LinkedBlockingQueue类都实现了BlockingQueue接口。


1

SynchronousQueue与其他队列的工作方式相似,但存在以下主要区别: 1)SynchronousQueue的大小为0 2)只有在take()方法能够立即从队列中获取元素时,put()方法才会插入元素。也就是说,如果消费者的take()调用需要一些时间来消费元素,则无法插入元素。

SynchronousQueue - 仅在某人立即接收它时才插入。


0

同步队列基本上用于交接目的。它们没有任何容量,并且put操作会被阻塞,直到其他线程执行get操作。

如果我们想要在两个线程之间安全地共享一个变量,我们可以将该变量放入同步队列中,并让另一个线程从队列中取出它。

https://www.baeldung.com/java-synchronous-queue的代码示例

    ExecutorService executor = Executors.newFixedThreadPool(2);
    SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

它们还被用于CachedThreadPool中,以实现无限(Integer.MAX)线程创建的效果,随着任务的到来。

CachedPool的coreSize为0,maxPoolSize为Integer.MAX,使用同步队列。

当任务到达队列时,其他任务会被阻塞,直到第一个任务被取出。由于它没有任何队列容量,线程池将创建一个线程,并且该线程将取出任务,从而允许更多的任务放入队列。这将继续进行,直到线程创建达到maxPoolSize。根据timeOut,空闲线程可能会被终止,并创建新的线程,而不会超过maxPoolSize。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接