循环无锁缓冲区

79

我正在设计一个系统,它连接到一个或多个数据流并对数据进行分析,然后根据结果触发事件。在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列中,多个消费者线程读取数据,并且消费者仅对最新数据点加上n个点感兴趣。如果慢的消费者无法跟上,则生产者线程必须阻塞,当没有未处理的更新时,消费者线程当然会阻塞。使用带有读者/写者锁的典型并发队列可以很好地工作,但是数据的输入速率可能很快,因此我想减少锁定开销,尤其是针对生产者的写锁。我认为循环无锁缓冲区就是我需要的。

现在有两个问题:

  1. 循环无锁缓冲区是答案吗?

  2. 如果是,那么在自己编写代码之前,您知道任何公共实现可以满足我的需求吗?

欢迎提供有关实现循环无锁缓冲区的任何指针。

顺便说一下,在Linux上使用C++实现。

一些额外的信息:

对于我的系统来说,响应时间至关重要。理想情况下,消费者线程希望尽快看到任何更新,因为额外的1毫秒延迟可能使系统变得无价值,或者价值大大降低。

我倾向于采用一种半无锁循环缓冲区的设计思路,生产者线程以最快的速度将数据放入缓冲区中,我们称缓冲区头部为A,除非缓冲区已满,否则不会阻塞生产者线程,当A到达缓冲区尾部Z时才会阻塞。每个消费者线程将持有两个指针指向循环缓冲区,P和Pn,其中P是线程的本地缓冲区头部,Pn是P后第n个项目。每个消费者线程在处理完当前的P并且慢est Pn指向缓冲区结尾Z时,将推进它的P和Pn。当P赶上A时,这意味着没有更多新的更新需要处理,消费者线程就会旋转并忙等待A再次前进。如果消费者线程旋转时间过长,可以将其置于睡眠状态,并等待条件变量,但我认为消费者线程占用CPU周期等待更新也可以,因为这不会增加我的延迟(我将拥有比线程更多的CPU核心)。想象一下你有一个循环轨道,生产者在一群消费者前面奔跑,关键是调整系统,使得生产者通常只比消费者领先几步,大多数操作都可以使用无锁技术完成。我明白实现细节并不容易...好吧,非常难,这就是为什么在犯错之前我想从别人的错误中学习的原因。

1
我学到的一件事是要处理大块工作。我不知道你的工作项大小,但如果你能生产更大的块并消耗更大的块,你可以提高效率。你还可以通过消耗可变大小的块来增加效率,这样消费者就不会同时完成并争夺数据队列。 - Zan Lynx
3
高效无锁算法是独特的"雪花",发现它们通常值得发表研究文章。在原作者明确需求与期望解决方案之前,我不会尝试回答这个问题。 - Dave
2
毫秒是在未修改的Linux上非常快的时间限制。如果另一个进程开始运行,那么你很容易错过它。你需要使用实时优先级,即使这样,我也不确定你能否可靠地满足这些期限。你确定你需要如此敏感吗?你能不能只让生产者变得更快,在例如设备驱动程序中实现它们,并放松对消费者的要求? - Doug
为什么不直接使用信号量呢?没有锁定/阻塞,当缓冲区为空时,消费者只会进入睡眠状态,当缓冲区满时,生产者才会进入睡眠状态。这有什么问题吗? - Tomas
如果您确实需要硬实时行为,那么最好考虑在硬实时操作系统上将程序运行为硬实时任务。对于适用于Linux的版本,请查看Xenomai,它可以让您在同一环境中同时运行硬实时和常规(软实时/非实时)进程。 - Jeremy Friesner
显示剩余5条评论
18个回答

46
我近几年来专门研究了无锁数据结构。我阅读了这个领域中的大部分论文(只有约四十篇左右,但只有十到十五篇真正有用:-)。
据我所知,目前还没有发明出一种无锁环形缓冲区。问题在于处理读者超越写者或相反情况下的复杂条件。
如果你没有花至少六个月的时间研究无锁数据结构,不要试图自己编写。你会弄错的,而且可能你并不知道错误存在,直到代码在新平台上部署后失败为止。
然而,我相信你的需求有解决方案。
你应该将一个无锁队列与一个无锁空闲列表配对使用。
空闲列表将为你提供预分配,因此可以省去(财政昂贵的)无锁分配器的要求;当空闲列表为空时,你可以通过立即从队列中取出一个元素并使用它来复制环形缓冲区的行为。

当然,在基于锁的循环缓冲区中,一旦获得锁定,获取元素非常快 - 基本上只需指针解引用 - 但您在任何无锁算法中都不会得到它;它们通常必须费尽心思才能完成任务;失败的自由列表弹出后跟随的出队开销与任何无锁算法需要执行的工作量相当。

迈克尔和斯科特在1996年开发了一个非常好的无锁队列。下面的链接将为您提供足够的细节,以便查找他们论文的PDF文件; Michael and Scott, FIFO

无锁自由列表是最简单的无锁算法,实际上我认为我没有看过它的实际论文。


@Supercat:M&S论文通过使用指针计数器对来显式解决ABA问题;“结构指针_t {ptr:指向node_t的指针,count:无符号整数}”。必要时会增加计数器,因此几乎不可能发生ABA。 - user82238
1
@Blank Xavier:啊,我不知怎么错过了这个。他们的CAS似乎假定存在一种交换两个项结构的指令;在.NET中我不知道有这样的功能。 - supercat
在Win32中,除了64位DCAS之外,几乎所有的操作都有Interlocked*() API函数调用。64位DCAS(例如128位原子CAS)具有不同类型的原型,以下划线开头,这可能是为什么它没有明显存在的原因。我不确定DCAS是否与CAS正确序列化,但我认为它会-基本机制是锁定给定的缓存行,因此DCAS将尝试锁定两个缓存行,而CAS将锁定一个-DCAS将注意到其他人已经锁定了它正在尝试获取的缓存行。 - user82238
3
我可能错了,但我认为除非 volatile 执行内存屏障,否则它不会对编译器或 CPU 的重新排序行为产生任何影响。它所做的只是确保从内存中读取其值,而不是从寄存器或缓存中读取。 - user82238
3
据我所知,volatile内存访问不会与其他volatile访问重新排序。但在ISO C中,这就是你所能得到的全部。在MSVC中,volatile远远超出了这个范围,但现在你应该只使用带有memory_order_releaseseq_cststd::atomic - Peter Cordes
显示剩余11条评论

35

2
队列不是循环缓冲区。 - user82238
29
@Blank Xavier:不,但是循环缓冲区是一个队列。问题需要使用队列。而队列最有效的实现方式是……(等待)循环缓冲区。无论如何,如果你想搜索,你应该搜索“无锁队列”,而不是“无锁循环缓冲区”。 - Norman Ramsey
1
我不明白为什么不能简单地使用信号量?没有锁定/阻塞,当缓冲区为空时仅消费者会进入休眠状态,当缓冲区满时才会进入生产者。这种方式有什么问题吗?如何才能比这更好地实现无锁队列? - Tomas
7
@Tomas: 无锁队列可以更好,因为没有单个锁来充当性能瓶颈。原帖作者特别关注在争用非常高的情况下减少锁的开销。信号量不能解决争用问题。而无锁队列可以。 - Norman Ramsey
1
https://liblfds.org/拥有备受推崇的多生产者/多消费者循环缓冲队列。它在技术上并不是无锁的:在添加条目时休眠的生产者可能会阻塞消费者从其他生产者那里看到任何内容。请参见https://dev59.com/5lcO5IYBdhLWcg3wXwh3,以分析其进度保证。它的争用非常低(如果它不是空的或满的,则生产者和消费者之间没有争用),在实践中可能是您想要的。 - Peter Cordes
@user82238 一个循环缓冲区就是一个带有有限容量的队列的实现,对吧? - U. Windl

12
生产者或消费者在缓冲区为空或已满时需要阻塞的要求表明您应该使用常规锁定数据结构,使用信号量或条件变量使生产者和消费者阻塞,直到有可用数据。无锁代码通常不会在这种情况下阻塞 - 它会自旋或放弃不能完成的操作,而不是使用操作系统进行阻塞。(如果您可以等待另一个线程生成或消耗数据,那么为什么等待锁定另一个线程完成更新数据结构更糟糕?)
在(x86 / x64)Linux上,如果没有争用,则使用互斥锁进行线程内同步相对便宜。重点是将生产者和消费者需要保持锁定的时间最小化。鉴于您说您只关心最后N个记录的数据点,我认为圆形缓冲区可以很好地做到这一点。但是,我真的不理解这与阻止要求和消费者实际消费(删除)他们读取的数据的想法如何相符。(您希望消费者仅查看最后N个数据点,并且不移除它们吗?您希望生产者不关心消费者是否跟得上,只需覆盖旧数据吗?)
此外,正如Zan Lynx评论的那样,当您有许多数据要处理时,可以将数据聚合/缓冲成更大的块。您可以缓冲固定数量的点或在一定时间内接收到的所有数据。这意味着将减少同步操作。但是会引入延迟,但是如果您没有使用实时Linux,则不管怎样都必须处理它。

完全同意第一段。我不认为在这里不使用信号量有任何理由。 - Tomas
1
@TMS,一个具有专用生产者线程和专用消费者线程的应用程序(例如OP所描述的)永远不能被称为“无锁”。在一个具有N个线程的无锁应用程序中,您应该能够无限期地挂起任何组合(N-1)或更少的线程,并且应用程序仍然应该继续取得进展。但是,如果所有生产者都被挂起,并且不允许将“产品”丢弃,则专用消费者线程无法无限期地取得进展;如果没有消费者被允许运行,则生产者无法取得进展。 - Solomon Slow
@jameslarge:“无锁”可以描述算法或数据结构(如队列)。https://en.wikipedia.org/wiki/Non-blocking_algorithm 它通常不适用于整个应用程序。暂停所有生产者线程只意味着队列将变空。但在无锁队列中,随时暂停任何一个或多个线程都不能阻止其他线程能够进行入队和出队。(即使这也不容易实现;高效的实现通常会有一个线程“声明”一个插槽:https://dev59.com/5lcO5IYBdhLWcg3wXwh3) - Peter Cordes
@Doug:是的,如果消费者/生产者发现队列为空/满了,他们应该休眠。但不是说你总是要用传统的锁来实现队列,特别是不要为整个数据结构使用一个大锁。这个队列(与之前的评论相同)在技术上并不是“无锁”的,但它确实允许生产者完全独立于消费者(没有争用),从而可能比你通过锁定获得更好的吞吐量。关于从空到非空的高效唤醒也是一个很好的观点。 - Peter Cordes

7
boost库中的实现值得考虑。它易于使用且性能相当高。我在一台四核i7笔记本电脑上编写了一个测试并运行它(8个线程),每秒得到约4M个入队/出队操作。到目前为止没有提到的另一个实现是MPMC队列。我在同一台电脑上使用32个生产者和32个消费者对此实现进行了一些简单的测试。正如其广告所说,它比boost无锁队列更快。
正如其他答案所述,无锁编程很难。大多数实现都会有难以检测的边角情况,需要进行大量的测试和调试才能修复。这些通常通过在代码中仔细放置内存屏障来修复。在许多学术文章中也可以找到正确性证明。我更喜欢使用暴力工具测试这些实现。您计划在生产中使用的任何无锁算法都应该使用类似TLA+的工具进行正确性检查。

6

关于这个问题,有一系列非常好的文章可以在DDJ上找到(点击链接)。这也说明了这些问题有多么困难,因为这篇文章是对之前一篇错误的文章(点击链接)的更正。在你自己动手之前,请确保你理解了这些错误。


5

我并不是硬件内存模型和无锁数据结构的专家,因此我倾向于在我的项目中避免使用它们,并使用传统的锁定数据结构。

然而,最近我注意到了这个视频:基于环形缓冲区的无锁SPSC队列

这是基于一个名为LMAX disruptor的高性能开源Java库开发的,用于交易系统: LMAX Disruptor

根据上面的演示,您可以将头和尾指针设置为原子操作,并以原子方式检查头是否从后面追上尾部或反之亦然的条件。

下面是一个非常基本的C++11实现:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}

1
我相信许多的load/store只需要memory_order_acquire或者release,而不是默认的seq_cst。这在x86上有很大的区别,因为seq_cst存储需要mfence(或者xchg),但是release存储只是普通的x86存储。StoreLoad屏障在大多数其他架构上也是最昂贵的屏障。(http://preshing.com/20120710/memory-barriers-are-like-source-control-operations/) - Peter Cordes
1
在类布局中,最好将read放在buffer之后,这样它就在与write不同的高速缓存行中。这样,两个线程只会读取彼此写入的高速缓存行,而不是同时写入相同的高速缓存行。此外,它们应该是size_t类型:使用32位指针没有64位计数器的意义。无符号类型使模数运算更加高效(https://godbolt.org/g/HMVL5C)。即使对于几乎所有用途,`uint32_t`也是合理的选择。最好根据大小进行模板化,或者动态分配缓冲区。 - Peter Cordes
1
@BaiyanHuang:因为计算机使用二进制整数,所以你只需要用“AND”保留低n位。例如,x % 8=x & 7,按位与比div甚至是编译时常量除数的技巧要便宜得多。 - Peter Cordes
@PeterCordes 有符号整数在大多数操作中通常更快,如果你的模数是2的常数幂,则可以使用value & (power - 1)来获得与无符号整数相同的优化模操作,但仍允许编译器执行额外的优化,因为有符号操作的边缘情况明确未定义。 - yyny
@yyny - 对于大多数操作来说,它们的速度是相同的。您是否有特定的ISA考虑您的性能声明?在x86-64上,其中一个差异是从32位扩展到64位通常在asm中是免费的,在每次写入32位寄存器时隐式发生。从int到指针宽度的符号扩展需要一种特殊的指令。我知道一些其他ISAs具有不同的标准调用约定(例如,MIPS64将值保持为64位的符号扩展通过/返回,因此对于非内联函数来说,接收带符号参数可能更便宜)。签名溢出UB允许一些优化。 - Peter Cordes
显示剩余2条评论

5
一种有用的减少争用的技术是将项目哈希到多个队列中,并将每个消费者专门用于一个“主题”。

对于您的消费者感兴趣的最新项目数量 - 您不想锁定整个队列并迭代以查找要覆盖的项目 - 只需发布N元组中的项目,即所有N个最近的项目。如果生产者在队列已满(当消费者无法跟上时)时阻塞并使用超时更新其本地元组缓存,则可以获得额外的加分,这样您就不会对数据源施加反向压力。

我也考虑过老板/工人线程模型,其中老板线程向工人线程的私有队列广播更新。我认为这更/少是你要走的方向。我需要再仔细考虑一下,但当我考虑它时,老板/工人似乎有太多的开销,因为所有工人都必须获得相同的更新。 - Shing Yip
1
并不完全是这样 - 我在第一点中的意思是切片您的输入流,以便不是所有线程都竞争相同的锁/队列。 第二点是在生产者端进行缓存,以适应输入峰值,并允许慢消费者不会使生产者停顿。 - Nikolai Fetissov
但是业务逻辑要求所有工作线程都知道正在流入的所有数据。只有一种类型的数据进入,每个数据点都同样重要,因此我无法将传入的流切片并在不同的队列中具有不同的数据。在生产者端进行缓存并捆绑更新数据模型以防止独占已经完成,但这还不足以处理负载。 - Shing Yip
输入域有多大?如果是金融世界中的市场数据之类的东西,你只有有限的、但很大的项目数量和少量类型的更新。工作人员对输入事件是否反应迅速,还是他们自己进行处理,只在必要时轮询你的输入? - Nikolai Fetissov
这类似于金融世界中的市场数据。工作人员进行自己的处理,并在需要时随机访问n个更新历史记录(n是可配置的数字,但在进程的生命周期内不会更改)。我想设计一个系统,在大型和小型n上都能很好地运行,以便我可以拥有一个代码库。 - Shing Yip
你可以通过指针交换将n元组传递给消费者(你需要考虑内存栅栏等问题 - 有序原子),但这样会涉及到像使用危险指针一样的内存管理问题。 - Nikolai Fetissov

5
Sutter知道他的队列不够优化。《多核编程艺术》是一个很好的参考书,但不要相信Java程序员关于内存模型的说法。Ross的链接不能给出明确的答案,因为他们的库存在这样的问题中等等。
除非您想花费大量时间在过度设计之上(从描述来看,这是“追求完美”在缓存一致性方面的常见疯狂情况),否则进行无锁编程会带来麻烦。这需要数年时间,并导致无法首先解决问题并稍后进行优化,这是一种常见病症。

你能否发布一篇关于Sutter队列分析的链接? - Nikolai Fetissov
这完全取决于 DDJ,他的一位关注者在博客中详细介绍了它。重点是对于许多场景并不需要高效的 CAS,即使使用简单的交换操作也能轻松实现细粒度控制。 - rama-jka toti
就这些,谢谢。但我认为它仍然可能存在一些竞争条件。任何像期望隐式屏障或特定的或完全的一致性理解这样敏感的东西都是一个等待在生产中发生问题的问题。我只是不相信那种细节水平能够解决很多问题,而是应该关注应用程序级别的设计,而不是低级别的管道,只有在有意义或被识别为如此时才这样做。我赞扬这个努力、这些书籍,所有这些;但它只是关于一个触及PFX群众市场的难题,即使微软也在努力做好。 - rama-jka toti
只是一个观点,总有比研究管道更重要的工作要做。并行努力在整个领域产生连锁反应,不仅仅是队列,或者说中期1990年代的线程DDJ文章不断创新;也就是说,从NT到后来的Solaris和Unix采用了类似的技术,或者最新的C++工作。后者可能需要很长时间才能完成,并且仍然无法解决没有干净的OO方式来发布P2-Pro样式的乱序宇宙这一事实的问题。 - rama-jka toti
2
Dennis的网站已经迁移到http://landenlabs.com/code/ring/ring.html - 它有一个无锁环形缓冲区。 - LanDenLabs
@user1959190:该无锁实现使用volatile size_t m_rIndex, m_wIndx;而不是C++11 std::atomic来处理索引,但我认为它依赖于从中获取加载/释放存储行为(例如其他线程必须在看到m_buffer[m_wIndex] = value存储之前看到m_wIndex = Next(m_wIndex))。因此,在x86上运行良好,但在ARM / PowerPC /任何其他平台上都会出现问题。此外,它还效率低下,因为它不是将volatile加载到本地变量中,而是在Get()Put()函数中不断重新引用volatile值。 - Peter Cordes

4

4
我同意这篇文章的观点,并建议不要使用无锁数据结构。最近有一篇关于无锁FIFO队列的论文在这里,可以搜索同一作者的其他论文;此外,查尔默斯大学有一篇关于无锁数据结构的博士论文(我找不到链接了)。然而,您没有说明元素的大小——只有字长大小的项目才能有效地使用无锁数据结构,因此如果它们比机器字长(32或64位)大,则必须动态分配元素。如果您动态分配元素,则会将(假定的,因为您尚未对程序进行性能分析,而且基本上是过早优化)瓶颈转移到内存分配器,因此您需要一个无锁内存分配器,例如Streamflow,并将其与应用程序集成。

3
如果您完全预分配元素,则不会给分配器带来压力。 - user82238

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接