我正在寻找一种无锁或非阻塞的方法来创建一个环形缓冲区,用于单个消费者/单个生产者,它将覆盖缓冲区中最旧的数据。我已经阅读了很多无锁算法,在缓冲区满时返回false——即不添加;但我找不到任何伪代码,谈论如何在需要覆盖最旧的数据时进行操作。
我正在使用GCC 4.1.2(工作的限制,我不能升级版本...),并且我有Boost库,在过去我制作了自己的Atomic 变量类型,它与即将推出的规范非常接近(它不完美,但是它是线程安全的并且做我需要的事情)。
当我考虑它时,我想使用这些原子操作确实可以解决问题。以下是我思考的大致伪代码:
据我所知,这里没有死锁情况,因此我们不必担心(如果上面的实现在伪代码层面上有误,欢迎提供建设性意见)。 然而,我发现一个很大的竞态条件是: 假设缓冲区已满。也就是说,writeIndex+1=readIndex; (1) 发生在调用consume时 (4) 是false,因此我们从缓冲区中读取 (5) 发生了,readIndex增加了一个(因此缓冲区中实际上有空间) (2) 发生了,再次推进readIndex,因此丢失了值。
基本上,这是一个经典的问题,即写者必须修改读者,导致竞争条件。除非每次访问它都阻止整个列表,否则我想不出防止这种情况发生的方法。我错过了什么?
我正在使用GCC 4.1.2(工作的限制,我不能升级版本...),并且我有Boost库,在过去我制作了自己的Atomic 变量类型,它与即将推出的规范非常接近(它不完美,但是它是线程安全的并且做我需要的事情)。
当我考虑它时,我想使用这些原子操作确实可以解决问题。以下是我思考的大致伪代码:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
据我所知,这里没有死锁情况,因此我们不必担心(如果上面的实现在伪代码层面上有误,欢迎提供建设性意见)。 然而,我发现一个很大的竞态条件是: 假设缓冲区已满。也就是说,writeIndex+1=readIndex; (1) 发生在调用consume时 (4) 是false,因此我们从缓冲区中读取 (5) 发生了,readIndex增加了一个(因此缓冲区中实际上有空间) (2) 发生了,再次推进readIndex,因此丢失了值。
基本上,这是一个经典的问题,即写者必须修改读者,导致竞争条件。除非每次访问它都阻止整个列表,否则我想不出防止这种情况发生的方法。我错过了什么?
atomic<pair<int, int>>
。你还需要能够原子地发布和消费元素数据,这在一般情况下是不可能无锁完成的,即使对于特定情况,要同时原子修改指针和元素数据也是不可能的,除非使用某种形式的锁。你的算法存在许多竞争条件。 - James McNellis