LinkedBlockingQueue中的addAll()方法是否线程安全(如果不是,解决方案)?

4
引用文档的话来说:
"BlockingQueue 实现是线程安全的。所有排队方法都使用内部锁定或其他形式的并发控制以原子方式实现其效果。但是,除非在实现中另有规定,否则不保证批量集合操作 addAll、containsAll、retainAll 和 removeAll 是原子执行的。因此,例如,在仅添加 c 中的一些元素后,addAll(c) 可能会失败(抛出异常)。
由于在 LinkedBlockingQueue.addAll() 操作的描述中没有特别说明,因此我必须假设它不是线程安全的。
你是否同意我的观点,为了保证通过 addAll() 添加的所有元素是连续的(即一起添加),唯一的解决方案是每次修改队列(使用 add 或 take 操作)时都使用 Lock?例如:
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
Lock lock = new ReentrantLock();

//somewhere, some thread...
lock.lock();
queue.addAll(someCollection);
lock.unlock();

//somewhere else, (maybe) some other thread...
lock.lock();
queue.take();
lock.unlock();

重要更新: 哇,之前的例子中没人发现一个大问题:由于take()是一个阻塞操作,并且由于需要锁来向队列添加元素,一旦队列为空,程序将进入死锁状态:写入者无法写入,因为锁被take()占用,同时take()将处于阻塞状态,直到队列中有内容被写入(但由于上述原因,这是不可能的)。有什么想法吗?我认为最明显的想法是在take()周围去掉锁定,但这样可能无法保证addAll()的所需原子性。


1
只有当所有对你的 BlockingQueue 的访问都通过那个 Lock 进行时,这才是一个解决方案。 - Sotirios Delimanolis
是的,我觉得那很明显(不过我已经编辑了;)) - justHelloWorld
2个回答

3

addAll仍然是线程安全的,只是它不能提供原子性。

因此,这取决于您的使用情况/期望。

如果您在不显式加锁的情况下使用addAll,则如果其他线程尝试向队列写入(添加新元素),则添加的元素顺序不受保证,可能会混淆。如果这是一个问题,那么是的,您需要加锁。但是addAll仍然是线程安全的,队列不会损坏。

但是通常,队列用于为许多读者/作者提供一种通信方式,并且不需要严格保留插入顺序。

现在,主要问题是,如果队列已满,则add方法会抛出异常,因此addAll操作可能会在中途崩溃,您不知道已添加哪些元素和未添加哪些元素。

如果您的用例允许等待插入元素的空间,则应在循环中使用put方法。

for (E e: someCollection) queue.put(e);

这将一直阻塞,直到有空间添加另一个元素。

手动锁定很棘手,因为每次访问队列时都必须记得添加锁定,这容易出错。因此,如果您确实需要原子性,请编写一个包装器类,该类实现BlockingQueue接口,但在调用底层操作之前使用锁定。


不幸的是,通过addAll()插入的元素不能与通过add()操作添加的其他元素混合使用:如果在正常的add()操作之前暂时调用addAll()操作,则附加到队列的元素必须在第一次操作添加的任何元素之后。 - justHelloWorld
LinkedBlockingQueue没有大小限制。关于使用包装类,我完全同意。 - fps
看看我的重要更新 bug ;) - justHelloWorld
1
您不需要为获取操作加锁。它是线程安全的,并且保留了顺序。只有写操作需要受到保护。 - Zielu
@Zielu 我同意你的看法 :) - justHelloWorld

0

我认为你把线程安全原子性搞混了。据我理解,批量操作是线程安全的,但不是原子操作。

我认为你不需要使用外部的ReentrantLock来使你的BlockingQueue线程安全。实际上,addAll()是通过迭代给定的Collection并对集合中的每个元素调用队列的add()来实现的。由于add()是线程安全的,因此您不需要同步任何内容。

当Javadoc说:

因此,例如在向c中添加一些元素后,addAll(c)可能会失败(抛出异常)。

它意味着当addAll(c)返回时,只有给定集合c的一些元素可能已被添加。但这并不意味着您需要在队列上锁定以调用take()或任何其他操作。

编辑:

根据您的用例,您可以像您建议的那样使用锁,但我会将其内部化到BlockingQueue实现中,这样调用者类就不需要遵守锁/从队列调用某些方法/解锁模式。在使用锁时,我会更加小心,即在try/finally块中使用它:
public class MyQueue<T> extends LinkedBlockingQueue<T> {

    private final Lock lock = new ReentrantLock();

    @Override
    public boolean addAll(Collection<T> c) {
        boolean r = false;
        try {
            this.lock.lock();
            r = super.addAll(c);
        } finally {
            this.lock.unlock();
        }
        return r;
    }

    @Override
    public void add(T e) {
        try {
            this.lock.lock();
            super.add(e);
        } finally {
            this.lock.unlock();
        }
    }

    // You don't need to lock on take(), since
    // it preserves the order in which elements
    // are inserted and is already thread-safe
}

1
我的主要问题是我必须确保通过addAll()操作添加的所有元素都是连续的,因此如果没有锁定,它们可能会与其他并发写操作期间插入的元素混合。请告诉我是否有误 ;) - justHelloWorld
你是正确的。如果你在一个线程中调用 add(oneElem),而在另一个线程中调用 addAll(manyElems),那么 oneElem 可能会被插入到 someElems 的“中间”。你需要依赖队列的插入顺序吗? - fps
当然可以:我的应用程序是为分布式系统项目而设计的,因此为了保证串行化,如果一个操作(或一组写操作)在另一个操作之前被提交(添加到列表中),那么必须维护这个顺序。 - justHelloWorld
@justHelloWorld 好的,我会编辑我的回答并提供一个建议,也许对你有用。 - fps
看看我的重要更新 bug ;) - justHelloWorld
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接