寻找合适的C语言环形缓冲区实现方法

8
我正在寻找一个C语言中的环形缓冲区实现(或伪代码),需要具备以下特征:
  • 多生产者单消费者模式(MPSC)
  • 消费者在空时阻塞
  • 生产者在满时阻塞
  • 无锁(我预计高争用率)
到目前为止,我只使用SPSC缓冲区 - 每个生产者一个 - 但我想避免消费者持续旋转以检查其所有输入缓冲区中的新数据(也许可以摆脱一些系统中的编组线程)。
我在Linux上开发Intel机器。

我不知道您所处的环境是什么,但在 Win32 中,您可以使用 WaitForMultipleObjects 来使消费者等待所有队列而无需旋转。 - Agent_L
1
抱歉,我没有提到我主要在Linux上工作。 - ziu
1
如果你没有得到真正的答案,使用这个链接可以轻松同步多个缓冲区:http://neosmart.net/blog/2011/waitformultipleobjects-and-win32-events-for-linux-and-read-write-locks-for-windows/ - Agent_L
1
请查看有关将Windows IPC应用程序移植到Linux的文章(http://www.ibm.com/developerworks/linux/library/l-ipc2lin3/index.html),其中描述了Windows和Linux之间不同的同步原语的比较。这是一篇多部分文章。我预计需要某种锁定以确保仅有一个进程或线程同时访问生产者和消费者之间队列的管理数据结构。 - Richard Chambers
1
这里有一篇关于“使用临界区(最好是锁)来消除竞争”的有趣文章,发表在Dr. Dobb's上。http://www.drdobbs.com/cpp/use-critical-sections-preferably-locks-t/201804238 - Richard Chambers
显示剩余4条评论
2个回答

4
请参见liblfds,一个无锁的MPMC环形缓冲区。它不会阻塞——无锁数据结构倾向于避免阻塞,因为无锁的目的就是避免阻塞;当数据结构以NULL返回时,需要处理这个问题——如果你尝试在空缓冲区上读取,它将返回NULL,但当写满时,它不符合你的要求;在这种情况下,它会丢弃最旧的元素并将其提供给你进行写操作。
然而,只需要进行小的修改即可获得这种行为。
但可能有更好的解决方案。环形缓冲区的棘手之处在于当缓冲区满时,需要获得最旧的先前元素并重新使用它。但你并不需要这个。我认为你可以使用原子操作重写SPSC内存屏障环形缓冲区。与liblfds中的MPMC环形缓冲区(它是队列和栈的组合)相比,这将更加高效。

到目前为止,我的SPSC实现相当简单:它仅依赖于本地和全局位置计数器来同步写入者和读取者(本地计数器用于批处理元素的推送/拉取并减少虚假共享)。条件变量已经放置好以减少旋转(如果没有可用数据,则无其他事情可做/如果目标已满,则不可避免地会有背压)。如果没有适当的内存屏障,我的实现在另一种架构上将无法工作。 您能详细说明一下您最后一点吗?最终,环形缓冲区始终是SPSC的,对吧? - ziu
1
有一个众所周知的SPSC循环缓冲区,例如在Linux内核中使用,它仅使用内存屏障,在缓冲区满或空时返回NULL。我猜可以通过使用原子操作将其制作为MPMC。 - user82238

3
我想我有你需要的东西。这是一个无锁环形缓冲区实现,可以阻塞生产者/消费者。您只需要访问原子原语 - 在此示例中,我将使用gcc的sync函数。
它有一个已知的bug - 如果您超出了100%的缓冲区溢出,则不能保证队列仍然是FIFO(它最终仍将处理所有内容)。
此实现依赖于读取/写入缓冲区元素作为原子操作(对于指针基本上是保证的)。
struct ringBuffer
{
   void** buffer;
   uint64_t writePosition;
   size_t size;
   sem_t* semaphore;
}

//create the ring buffer
struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer));
buf->buffer = calloc(bufferSize, sizeof(void*));
buf->size = bufferSize;
buf->semaphore = malloc(sizeof(sem_t));
sem_init(buf->semaphore, 0, 0);

//producer
void addToBuffer(void* newValue, struct ringBuffer* buf)
{
   uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size;

   //spin lock until buffer space available
   while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue));
   sem_post(buf->semaphore);
}    

//consumer
void processBuffer(struct ringBuffer* buf)
{
   uint64_t readPos = 0;
   while(1)
   {
       sem_wait(buf->semaphore);

       //process buf->buffer[readPos % buf->size]
       buf->buffer[readPos % buf->size] = NULL;
       readPos++;
   }
}

1
这个例子很有趣。让我看看我是否理解得很好:索引增量和写入是无锁的,而当没有可消费的内容时,您使用信号量形式的锁来阻止消费者。 虽然我不明白如何可能溢出此缓冲区。 此外,您是否在预期旋转时间非常短的系统中使用了此结构?旋转循环的影响如何? - ziu
您可以通过比数据处理速度更快地写入数据来溢出缓冲区。最终,写入索引将会“超过”读取器。此时,写入者必须在自旋锁中等待,直到读取器赶上(否则它将覆盖缓冲区中的数据)。如果您将队列溢出超过100%,则会发生错误。在这种情况下,有多个线程在自旋锁中等待缓冲区可用。不能保证哪个线程将首先写入队列。 - charliehorse55
2
将上面的循环改写为以下方式不是更简单吗?`while(1) { while(__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue) == false); sem_post(buf->semaphore); break; }` - ziu
是的。我猜是这样。在我的代码中,我甚至不使用自旋锁 - 如果我溢出了缓冲区,我会将锁覆盖在所有内容上,并将缓冲区的大小增加(增加10倍)。 - charliehorse55
1
实际上,使用那种方法,您甚至不需要外部的 while(1) 循环。 - charliehorse55
1
就旋转性能而言-这不应该是一个问题。理念是你的缓冲区足够大,以便它永远不会变满,因此你将永远不会自旋。自旋锁只是一种安全措施,以防溢出缓冲区导致未定义行为。 - charliehorse55

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接