ConcurrentBitSet中的竞态条件

3

我正试图在Java中创建一个并发位集,可以允许大小扩展(与固定长度相反,这是相当简单的)。 这是类的核心部分(其他方法现在不重要)

public class ConcurrentBitSet {

static final int CELL = 32;
static final int MASK = CELL - 1;
final AtomicReference<AtomicIntegerArray> aiaRef;

public ConcurrentBitSet(int initialBitSize) {
    aiaRef = new AtomicReference<>(new AtomicIntegerArray(1 + ((initialBitSize - 1) / CELL)));
}

public boolean get(int bit) {
    int cell = bit / CELL;
    if (cell >= aiaRef.get().length()) {
        return false;
    }
    int mask = 1 << (bit & MASK);
    return (aiaRef.get().get(cell) & mask) != 0;
}

public void set(int bit) {
    int cell = bit / CELL;
    int mask = 1 << (bit & MASK);
    while (true) {
        AtomicIntegerArray old = aiaRef.get();
        AtomicIntegerArray v = extend(old, cell);
        v.getAndAccumulate(cell, mask, (prev, m) -> prev | m);
        if (aiaRef.compareAndSet(old, v)) {
            break;
        }
    }
}

private AtomicIntegerArray extend(AtomicIntegerArray old, int cell) {
    AtomicIntegerArray v = old;
    if (cell >= v.length()) {
        v = new AtomicIntegerArray(cell + 1);
        for (int i = 0; i < old.length(); i++) {
            v.set(i, old.get(i));
        }
    }
    return v;
}

public String toString() {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < aiaRef.get().length(); i++) {
        for (int b = 0; b < CELL; b++) {
            sb.append(get(i * CELL + b) ? '1' : '0');
        }
    }
    return sb.toString();
}

}

很不幸,这里似乎存在竞态条件。

这是示例测试代码,每次都会失败几个位 - 它应该打印所有的“1”,直到第300位,但是每次都有一些随机的“0”,位置也不同。在一台电脑上我只得到了一些,而在另一台电脑上,连续的奇偶位置上有8-10个零。

    final ConcurrentBitSet cbs = new ConcurrentBitSet(10);
    CountDownLatch latch = new CountDownLatch(1);
    new Thread() {
        public void run() {
            try {
                latch.await();
                for (int i = 0; i < 300; i += 2) {
                    cbs.set(i);
                }
            } catch (InterruptedException e) {

            }
        };
    }.start();
    new Thread() {
        public void run() {
            try {
                latch.await();
                for (int i = 0; i < 300; i += 2) {
                    cbs.set(i + 1);
                }
            } catch (InterruptedException e) {
            }
        };
    }.start();
    latch.countDown();
    Thread.sleep(1000);
    System.out.println(cbs.toString());

以下是我获取的示例:

11111111111111111111111111111111111111111111111111111101111111111111111111111111011111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111101111111111111111111111111111111111111111111111111111111111111011111111111111111111111111111111111111111111111111100000000000000000000 11111111111111111111111111111111111111110111111111111111111111110101111111111111111111110101011111111111111111111111111111111111010101111111111111111111111111111111111101010111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111100000000000000000000

这很难调试(因为任何复杂的东西都往往会使竞争条件消失),但看起来在两个线程尝试同时扩展数组大小的点,在其中一个失败并不得不重试 while 循环的情况下,aiaRef.get() 在循环的下一部分 - 它已经访问过的部分(如果其他线程也在尝试扩展它)最终会在内部有一些零值。

有人知道错误在哪里吗?


1
不要等待两个线程完成工作的一秒钟,更明智的做法是让主线程.join()每个线程。这样,您就可以确信它们都已经完成了。使用sleep(),不能保证操作系统中的某些故障没有延迟其中一个线程一秒钟或更长时间。 - Solomon Slow
@jameslarge 是的,确实很久没有在现实世界中使用原始线程了(现在总是用Future这个,Future那个)。 - Artur Biesiadowski
1个回答

4
问题在于aiaRef.compareAndSet()只能保护并发的替换,因为AtomicReference的唯一工作是保护其引用的原子性。如果在我们重建数组时,引用的对象同时被并发修改,那么compareAndSet()会成功,因为它正在将相同的引用与自身进行比较,但它可能会错过一些修改。

谢谢,这似乎是问题所在。不幸的是,当我试图提出更好的解决方案时,最终得到的东西可能只是一个大的读/写可重入锁。我可能会一直使用 RW 锁,直到它被证明是一个主要瓶颈。 - Artur Biesiadowski

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接