检查大小并执行操作 - ConcurrentLinkedDeque安全吗?

3

如果Deque的大小将超过限制,我需要用新值替换第一个值。我编写了以下代码来解决这个问题:

final class Some {
    final int buffer;
    final Deque<Operation> operations = new ConcurrentLinkedDeque<>();
    // constructors ommited;

    @Override
    public void register(final Operation operation) {
        if (this.operations.size() == this.buffer) {
            // remove the oldest operation
            this.operations.removeFirst();
        }
        // add new operation to the tail
        this.operations.addLast(operation);
    }

    @Override
    public void apply() {
        // take the fresh operation from tail and perform it
        this.operations.removeLast().perform();
    }
}

如您所见,我有两种修改 Deque 的方法。我怀疑在多线程环境下,此代码是否能正常运作。问题是:检查 size(),然后再执行修改 ConcurrentLinkedDeque 操作,是否安全?我想尽量减少锁的使用。如果此代码无法工作,则必须引入锁机制,那么使用 ConcurrentLinkedDeque() 就没有意义了。

final class Some {
    final int buffer;
    final Deque<Operation> operations = new LinkedList<>();
    final Lock lock = new ReentrantLock();
    // constructors ommited;

    @Override
    public void register(final Operation operation) {
        this.lock.lock();
        try {
            if (this.operations.size() == this.buffer) {
                // remove the oldest operation
                this.operations.removeFirst();
            }
            // add new operation to the tail
            this.operations.addLast(operation);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void apply() {
        this.lock.lock();
        try {
            // take the fresh operation from tail and perform it
            this.operations.removeLast().perform();
        } finally {
            this.lock.unlock();
        }
    }
}

这是使用 Lock 的替代方案。这是实现我想要的唯一方法吗?我特别想尝试使用并发集合。


3
不,非“Lock”示例在多线程环境中不安全。未经外部同步,“检查后执行”不是原子操作。此外,除非你不能在运行时添加“Operation”,否则在持有“Lock”的同时调用“perform()”是不合适的。 - Slaw
此外,ConcurrentLinkedDeque#size()需要完全扫描,并且可能会因并发添加/删除而变化。您最好使用ArrayDeque和您的锁定代码。 - Ben Manes
2个回答

4

同时访问的集合是在内部状态方面线程安全的。换句话说,它们:

  • 允许多个线程并发读、写而不必担心内部状态会被破坏
  • 允许迭代和删除,而其他线程正在修改该集合
    • 然而,并非所有都支持。我相信 CopyOnWriteArrayListIterator 不支持 remove() 操作
  • 提供了一些保证,例如happens-before(事件先于关系)
    • 这意味着一个线程的写操作将在后续线程的读操作之前发生

但是,在外部方法调用时它们是跨线程安全的。当您调用一个方法时,它将获取所需的任何锁定,但这些锁定在方法返回时将被释放。如果不小心,这可能会导致检查-然后-执行竞争条件。请仔细查看您的代码。

if (this.operations.size() == this.buffer) {
    this.operations.removeFirst();
}
this.operations.addLast(operation);

以下情况可能发生:
  1. Thread-A 检查大小条件,结果为 false
  2. Thread-A 移动以添加新的 Operation
  3. Thread-A 能够添加 Operation 之前,Thread-B 检查大小条件,也得到了 false 的结果
  4. Thread-B 开始添加新的 Operation
  5. Thread-A 确实添加了新的 Operation
    • 哦,不!Thread-A 添加的 Operation 导致大小阈值被达到
  6. Thread-B 已经过了 if 语句,添加了它的 Operation,使 deque 多了一个 Operation
这就是为什么 check-then-act 需要外部同步,并且你在第二个例子中使用 Lock 进行同步。请注意,你也可以在 Deque 上使用 synchronized 块。
与你的问题无关:在第二个例子中,在持有 Lock 的同时调用 Operation.perform()。这意味着当执行 perform() 时,没有其他线程可以尝试向 Deque 添加另一个 Operation。如果不希望如此,你可以将代码更改为:
Operation op;

lock.lock();
try {
    op = deque.pollLast(); // poll won't throw exception if there is no element
} finally {
    lock.unlock();
}

if (op != null) {
    op.perform();
}

2

从size()的文档中可以看出:

需要注意的是,与大多数集合不同,此方法不是常量时间操作。由于这些双端队列的异步性质,确定当前元素数量需要遍历它们以进行计数。此外,在执行此方法期间可能会更改大小,这种情况下返回的结果将不准确。因此,在并发应用程序中,此方法通常不是非常有用。

虽然@Slaw是正确的,但在遍历过程中也可能发生加法/减法运算。

我在我的软件中不使用size()。我使用AtomicInteger来保留集合中元素的数量。如果count.get() < max,则可以添加元素。对于我的用途而言,稍微超过max并不要紧。您可以在count上使用锁来强制执行规定。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接