实现阻塞队列:SynchronousQueue和LinkedBlockingQueue有什么区别?

35
我看到了这些关于BlockingQueue的实现,但是不理解它们之间的区别。我的结论如下:
  1. 我永远不需要使用SynchronousQueue
  2. LinkedBlockingQueue保证先进先出,如果想让BlockingQueue也变成先进先出,则必须创建时传入true参数
  3. SynchronousQueue破坏了大多数集合方法(contains、size等)
那么我何时需要使用SynchronousQueue呢?这种实现的性能是否比LinkedBlockingQueue更好?
让问题更加复杂的是,为什么Executors.newCachedThreadPool在使用SynchronousQueue,而其他线程池(Executors.newSingleThreadExecutor和Executors.newFixedThreadPool)则使用LinkedBlockingQueue?
我的理解是,使用SynchronousQueue时,如果没有自由线程,则生产者将被阻塞。但是由于线程数量实际上是无限的(需要时将创建新线程),因此这永远不会发生。那么为什么要使用SynchronousQueue呢?

2
可能是Java队列实现,哪一个?的重复问题。 - aioobe
这是一些关于不同排队策略的文章: http://letslearnjavaj2ee.blogspot.ru/2013/09/queuing-strategies-for.html - Valya
3个回答

55

SynchronousQueue是一种非常特殊的队列——它在Queue接口下实现了一种约会方法(生产者等待消费者准备就绪,消费者等待生产者准备就绪)。

因此,只有在需要特定语义的特殊情况下才需要使用它,例如单线程处理任务而无需排队更多请求

使用SynchronousQueue的另一个原因是性能。SynchronousQueue的实现似乎经过了大量优化,因此如果您不需要比约会点更多的东西(就像在Executors.newCachedThreadPool()的情况下一样,其中消费者是“按需”创建的,这样队列项目不会积累),则可以通过使用SynchronousQueue获得性能提升。

简单的合成测试表明,在双核机器上的简单单个生产者-单个消费者方案中,SynchronousQueue的吞吐量比队列长度=1的LinkedBlockingQueueArrayBlockingQueue高约20倍。当队列长度增加时,它们的吞吐量也会上升,并且几乎达到SynchronousQueue的吞吐量。这意味着与其他队列相比,SynchronousQueue在多核机器上具有较低的同步开销。但是,这仅在需要伪装为Queue的约会点的特定情况下才重要。

编辑:

这里是一个测试:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

这是我的机器上的结果:

enter image description here


你是怎么得到那张图的?我真的很想通过这张图看到这样的性能。 - Sukh

5

目前默认的 Executors(基于 ThreadPoolExecutor)可以使用一组预先创建的固定大小线程和某些大小的 BlockingQueue 用于任何溢出,或者仅在队列满时创建线程,最多可以创建到最大大小。

这导致了一些令人惊讶的特性。例如,由于仅在队列容量达到时才创建额外的线程,因此使用 LinkedBlockingQueue(无界)意味着即使当前池大小为零,也永远不会创建新线程。如果使用 ArrayBlockingQueue,则只有在队列满时才创建新线程,并且如果池没有在那时清空空间,后续作业很可能会被拒绝。

SynchronousQueue 的容量为零,因此生产者会阻塞,直到有消费者可用或创建线程。这意味着尽管 @axtavt 提供的数据看起来很不错,但缓存的线程池从生产者的角度来看通常具有最差的性能。

不幸的是,目前还没有一个好的库版本的妥协实现,它将在爆发或活动期间从低最小值创建线程,最多达到某个最大值。您要么有一个可增长的池,要么有一个固定的池。我们内部有一个,但它还没有准备好供公众使用。


如果你所说的是正确的,那么这很奇怪。我使用带有LinkedBlockingQueue的缓存线程池,它可以正常工作。你如何解释这个问题? - nanda
这意味着,尽管@axtavt产生的数字看起来令人印象深刻,但从生产者的角度来看,缓存线程池通常具有最差的性能。这正是我在@Peter答案中评论的内容。 - nanda
2
你说你使用带有LinkedBlockingQueue的缓存线程池是什么意思?它们都在幕后使用相同的ThreadPoolExecutor,只有在(a)没有corePoolSize线程正在运行(b)没有可用的线程来处理作业(c)将作业提供给队列时被拒绝(d)未达到maximumPoolSize时才会创建线程。否则就会被拒绝。因此,只有当使用max-capacity明显小于Integer.MAX_VALUE的LinkedBlockingQueue构造时,才会超出核心大小进行线程创建。它只会在队列满时增长。 - Jed Wesley-Smith
“阻塞直到有消费者可用”是不正确的。如果我们已经达到了maxPoolSize,并且所有线程都忙碌,当提交新任务时,行为将取决于RejectedExecutionHandler。如果我错了,请@jed-wesley-smith纠正我。 - Denny Abraham Cheriyan
@DennyAbrahamCheriyan 我特别描述了SynchronousQueue类,它在使用带有超时的offer方法时会阻塞。然而,ThreadPoolExecutor类并不是这样使用它的,它使用立即的offer()方法,当它失败时尝试添加另一个工作线程,直到达到最大工作线程数,在那里它确实会像你提到的那样拒绝。默认的Exectors.newCachedTheadPool()工厂方法将max设置为Integer.MAX_VALUE,因此永远不会达到最大值。 - Jed Wesley-Smith

4

缓存线程池按需创建线程。它需要一个队列,可以将任务传递给等待的消费者或失败。如果没有等待的消费者,它会创建一个新线程。SynchronousQueue 不持有元素,而是将元素传递或失败。


你能让这个句子更清晰吗:“SynchronousQueue通过将任务传递给等待的消费者或失败(如果没有),以便池可以创建另一个线程来支持此操作。也就是说,它从不保留任务。” 我仍然不理解。谢谢。 - nanda
那么,return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) 和 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) 有什么区别呢?第一个是newCachedThreadPool的实现。到目前为止,我只理解第一个会在没有空闲线程时阻塞生产者,但由于线程数是“无限的”,新线程将自动创建。 - nanda
我已经尝试过它,它像缓存线程池一样按照我的预期工作。 - nanda
@Peter:回到最初的问题,为什么要使用SynchronousQueue,如果可以使用LinkedBlockingQueue来模拟相同的行为。这使得newCachedThreadPool的行为与其他线程池不同,因为提交过程会等待线程创建。即使它非常快,但在某些情况下仍然会有影响。 - nanda
如果有一个线程在等待,则同步队列(SynchronousQueue)更快,因为它更轻量级。即使是 LinkedBlockingQueue 或 ArrayBlockingQueue 也必须创建对象才能将元素添加到队列中或从队列中取出元素。为什么 ArrayBlockingQueue 这样做是另一个问题。 - Peter Lawrey
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接