无锁队列--单生产者,多消费者

18

我正在寻找一种支持单生产者和多消费者的无锁队列数据结构实现方法。我已经研究了Maged Michael和Michael Scott(1996)的经典方法,但他们的版本使用了链表。我希望有一种实现方式,可以使用有限循环缓冲区,同时使用原子变量实现。

另外,我不确定为什么这些经典方法都是设计用于需要大量动态内存管理的链表中。在多线程程序中,所有内存管理例程都是串行化的。使用锁定方法与动态数据结构相结合,难道不会削弱无锁方法的优势吗?

我正在尝试在Intel 64位体系结构上使用pthread库编写C/C++代码。

谢谢, Shirish


1
有限大小的缓冲区意味着如果其中没有空间,生产者可能会失败。这对您可接受吗? - user319799
1
还要注意,在C++中,您可以为std::list提供自己的分配器。由于您只有一个生产者,因此此分配器不需要同步。例如,它可以从预分配的缓冲区中“分配”列表节点,并在空间用尽时使用全局同步的类似于“真实”的malloc()分配器来分配新的缓冲区。这意味着它仅在1%的调用中使用同步。 - user319799
如果你想要优化线程的内存使用,那么tcmalloc是一个非常好的库。由于它为每个线程维护内存池,所以它可能避免了内存例程序列化问题。 - Jonathan M Davis
根据我的研究,Intel TBBMalloc 比 Google TCMalloc 更具可扩展性。您也可以提供自己的分配器或使用 Intel TBB cache_aligned_allocator<T> 传递给 std::list 或 std::queue。 - Viet
有一些malloc实现是无锁的或大多数情况下无锁的,还有许多线程感知的malloc可以在许多情况下避免锁定。 - DrPizza
可能是Circular lock-free buffer的重复问题。 - Bob Gilmore
4个回答

10
使用循环缓冲区需要锁定,因为需要阻塞以防止头部超过尾部。但是,头和尾指针可以轻松地进行原子更新。或者在某些情况下,缓冲区可能非常大,覆盖不是问题。(在现实生活中,您将在自动化交易系统中看到这一点,其中循环缓冲区的大小为X分钟的市场数据。如果您落后于X分钟,则会遇到比覆盖缓冲区更糟糕的问题)。
当我在C ++中实现MS队列时,我使用堆栈构建了一个无锁分配器,这很容易实现。如果我有MSQueue,那么在编译时我就知道sizeof(MSQueue :: node)。然后,我制作了一个所需大小的N个缓冲区的堆栈。 N可以增长,即如果pop()返回null,则很容易向堆请求更多块,并将它们推入堆栈。除了可能阻塞调用以获取更多内存之外,这是一个无锁操作。
请注意,T不能具有非平凡的dtor。我曾经开发过一版允许非平凡dtors的版本,实际上起作用了。但是我发现,只需将T指针指向我想要的T,其中生产者释放所有权,消费者获得所有权即可。当然,这要求使用无锁方法分配T本身,但是我使用的与堆栈相同的分配器在这里也适用。
无论如何,无锁编程的重点不在于数据结构本身变慢。重点是:
  1. 无锁使我独立于调度程序。基于锁的编程取决于调度程序,以确保锁的持有者正在运行,以便他们可以释放锁。这就是导致“优先级反转”的原因。在Linux上有一些锁属性可以确保发生这种情况
  2. 如果我独立于调度程序,则操作系统更容易管理时间片,并且上下文切换更少
  3. 使用无锁方法编写正确的多线程程序更容易,因为我不必担心死锁、活锁、调度、同步等。在共享内存实现中,特别是真正的问题是,进程可能会在持有锁的情况下死亡,而没有办法释放锁
  4. 无锁方法更容易扩展。事实上,我已经使用网络消息实现了无锁方法。这样的分布式锁是一场噩梦
话虽如此,还有许多情况下,基于锁的方法更可取和/或必需。
  1. 当更新昂贵或无法复制的事物时,大多数无锁方法使用某种版本控制,即复制对象,更新它,并检查共享版本是否与您复制它时相同,然后将当前版本作为您要更新的版本。否则再次复制它,应用更新并再次检查。反复执行此操作直到成功。如果对象很小,则可以使用此方法,但是如果对象很大或包含文件句柄等,则不建议使用。
  2. 大多数类型都无法以无锁方式访问,例如任何STL容器。这些具有需要非原子访问的不变量,例如assert(vector.size() == vector.end() - vector.begin())。因此,如果您正在更新/读取共享的向量,则必须锁定它。

7

1

对于传统的单块循环缓冲区,我认为使用原子操作无法安全地完成此操作。您需要在一次读取中完成如此多的操作。假设您有一个具有以下结构的结构:

uint8_t* buf;
unsigned int size; // Actual max. buffer size
unsigned int length; // Actual stored data length (suppose in write prohibited from being > size)
unsigned int offset; // Start of current stored data

在读取时,您需要执行以下操作(这是我实现的方式,您可以交换一些步骤,如后面所讨论的):

  1. 检查读取长度是否不超过存储长度
  2. 检查偏移量+读取长度是否不超过缓冲区边界
  3. 读取数据
  4. 增加偏移量,减少长度

为使此操作正常工作,您应该同步执行哪些操作(因此是原子操作)?实际上将步骤1和4合并为一个原子步骤,或者澄清一下:请按以下方式同步执行:

  1. 检查read_length,这可以是类似于read_length=min(read_length,length);的内容
  2. 使用read_length减少长度:length-=read_length
  3. 从偏移量获取本地副本:unsigned int local_offset = offset
  4. 使用read_length增加偏移量:offset+=read_length

然后你只需要从你的本地偏移量开始执行memcpy(或其他操作),检查你的读取是否超过了循环缓冲区大小(分成2个memcpy),...。这是相当安全的线程,但你的写入方法仍然可能会覆盖你正在读取的内存,所以确保你的缓冲区足够大以最小化这种可能性。

现在,虽然我可以想象你可以在原子操作中组合3和4(我想这就是他们在链表案例中所做的),甚至在1和2中进行组合,但我不能看到你可以在一个原子操作中完成所有这些步骤 :)

不过,如果你的消费者非常聪明并且总是知道要读取什么,你可以尝试放弃“长度”检查。然后你还需要一个新的woffset变量,因为通过(offset+length)%size来确定写入偏移量的旧方法将不再起作用。请注意,这接近于链表的情况,实际上你总是从列表中读取一个元素(=固定,已知大小)。同样,在这里,如果你把它变成循环链表,你就可能会读取太多或写入一个正在读取的位置!

最后,我的建议是使用锁,我使用了一个CircularBuffer类(完全安全的读写),用于实时720p60视频流,并且在加锁方面没有任何速度问题。


0

这是一个老问题,但是没有人提供确切的答案。考虑到它仍然在搜索结果中排名较高,应该有一个解决方案存在。

可能有多种解决方案,但这里有一种已经有实现: https://github.com/tudinfse/FFQ 自述文件中引用的会议论文详细介绍了算法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接