C/C++无锁(或非阻塞)环形缓冲区,可以覆盖最旧的数据?

9
我正在寻找一种无锁或非阻塞的方法来创建一个环形缓冲区,用于单个消费者/单个生产者,它将覆盖缓冲区中最旧的数据。我已经阅读了很多无锁算法,在缓冲区满时返回false——即不添加;但我找不到任何伪代码,谈论如何在需要覆盖最旧的数据时进行操作。
我正在使用GCC 4.1.2(工作的限制,我不能升级版本...),并且我有Boost库,在过去我制作了自己的Atomic 变量类型,它与即将推出的规范非常接近(它不完美,但是它是线程安全的并且做我需要的事情)。
当我考虑它时,我想使用这些原子操作确实可以解决问题。以下是我思考的大致伪代码:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

据我所知,这里没有死锁情况,因此我们不必担心(如果上面的实现在伪代码层面上有误,欢迎提供建设性意见)。 然而,我发现一个很大的竞态条件是: 假设缓冲区已满。也就是说,writeIndex+1=readIndex; (1) 发生在调用consume时 (4) 是false,因此我们从缓冲区中读取 (5) 发生了,readIndex增加了一个(因此缓冲区中实际上有空间) (2) 发生了,再次推进readIndex,因此丢失了值。
基本上,这是一个经典的问题,即写者必须修改读者,导致竞争条件。除非每次访问它都阻止整个列表,否则我想不出防止这种情况发生的方法。我错过了什么?

我认为这个问题不存在无锁算法。无锁的生产者/消费者模式之所以有效,是因为生产者和消费者永远不需要触碰相同的数据。但是你提出的方案将两者紧密联系在一起,这将需要某种形式的锁定机制。 - Billy ONeal
1
你绝对不可能出现死锁:因为你没有任何锁。:-D 话虽如此,我认为这个问题的无锁算法最多也只能是困难的。你需要对两个索引进行原子修改,所以你真的需要一个 atomic<pair<int, int>>。你还需要能够原子地发布和消费元素数据,这在一般情况下是不可能无锁完成的,即使对于特定情况,要同时原子修改指针和元素数据也是不可能的,除非使用某种形式的锁。你的算法存在许多竞争条件。 - James McNellis
谢谢大家的评论。是的,看起来我的直觉是正确的......尝试做一个非阻塞版本并不容易。 - Nik
3个回答

7
  1. 从单个生产者/多个消费者队列开始,保证适当的进度。
  2. 如果队列已满并且推入失败,则弹出一个值。然后就有空间推入新值了。

很有趣,但是我看到了在弹出/推入位上潜在的竞争条件 --> 你需要循环直到推入成功,以防另一个线程在你弹出后推入。无论如何,非常聪明的解决方案! - Matthieu M.
谢谢,这确实帮助我以更清晰的方式思考。非常感谢所有发帖的人! - Nik
@Matthieu:这可能是多生产者情况下的一个问题,但是这是单个生产者,因此没有其他线程可以推送。 - Ben Voigt
没错,我漏掉了那个。不过我仍然会使用循环,这样容器就可以在多生产者的情况下被重复使用。 - Matthieu M.

1
我漏掉了什么??
很多东西:
- 比如说,在生产者覆盖t时,你是如何检测和处理它的? - 有很多选择 - 例如,“做 {”将值复制出来;使用修改序列号等检查复制的完整性。“} while (corrupt)”。 - 使用原子数不够,你还需要使用CAS-style循环来影响索引增量(虽然我假设你已经知道这一点,因为你说你已经广泛阅读过相关信息)。 - 内存屏障。
但是,让我们忽略那些低于你的伪代码水平的问题,考虑你明确提出的问题:
- 点(5)实际上需要进行CAS操作。如果在consume()上正确采样/复制了readIndex - 在复制可能是损坏的t之前 - 则如果生产者已经将其递增,CAS指令将失败。而无需重新采样和重试CAS操作,只需继续执行即可。

嗨Tony,对于糟糕的代码/伪代码,抱歉。只是为了确认一下,我在(2)处也需要一个CAS操作吗? - Nik
此外,针对覆盖条件,我认为如果缓冲区实际上被填充了 Atomic < T > 而不是 T 自身,那么我可以解决这个问题。因为访问将受到保护,这应该能解决问题吧? - Nik
@J-16 SDiZ:在代码中,有一行被指定为“// 5”。 @Nik:是的-我提到的前几个问题是系统性的,并不限于点(5)。CAS 应该在 getNextIndex 的实现中使用,在点 5 中 getNextIndex 不适用,如上所述。即使 T 是原子的,你怎么知道你读的是旧的还是新的?你需要更加深入地思考这个问题。这也是一个你必须非常小心你的内存屏障的点。我给了一些提示,但确切的答案可以针对你的数据类型进行调整,或者变得通用但速度较慢。 - Tony Delroy

0

以下是我最近创建的一段关于原子变量的循环缓冲区代码。我已经进行了修改以便“覆盖”数据而不是返回false。 免责声明 - 它尚未经过生产级别测试。

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接