无锁单生产者多消费者数据结构使用原子操作

3
最近我有以下示例代码(实际代码更为复杂)。在观看了Hans Boehm在cppcon16上关于原子操作的演讲后,我有些担心我的代码是否可行。 produce由单个生产者线程调用,而consume由多个消费者线程调用。生产者仅按顺序更新数据中的序列号,如2、4、6、8...,但在更新数据之前将其设置为奇数序列号,如1、3、5、7...,以表示数据可能是脏数据。消费者尝试以相同的序列号(2、4、6...)获取数据。
消费者在读取后会再次检查序列号,以确保数据正确(不会在读取期间被生产者更新)。
我认为我的代码在x86_64上(我的目标平台)运行良好,因为x86_64不会对存储与其他存储或加载进行重新排序,但我怀疑它在其他平台上是错误的。
我是否正确地认为数据分配(在生产中)可以移动到“store(n-1)”之上,以使消费者读取损坏的数据,但t == t2仍然成功?
struct S 
{
    atomic<int64_t> seq;
    // data members of primitive type int, double etc    
    ...
};

S s;

void produce(int64_t n, ...) // ... for above data members
{
    s.seq.store(n-1, std::memory_order_release); // indicates it's working on data members

    // assign data members of s
    ...

    s.seq.store(n, std::memory_order_release); // complete updating
}

bool consume(int64_t n, ...) // ... for interested fields passed as reference
{
    auto t = s.load(std::memory_order_acquire);

    if (t == n)
    {
        // read fields
        ...

        auto t2 = s.load(std::memory_order_acquire);
        if (t == t2)
            return true;
    }        

    return false;
}

注意:像这样的无锁结构非常难以正确实现。有许多陷阱:http://blog.memsql.com/common-pitfalls-in-writing-lock-free-algorithms/ - Violet Giraffe
http://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering 正好涉及到这个问题。它对你有答案吗? - Michael Foukarakis
Michael,cppref链接与我的问题非常不同。我试图解决的问题是,生产者多次更新相同的“S s;”,如果生产者没有将数据更新到更高的序列号,则希望消费者读取指定序列号的数据。 - Derek
@VioletGiraffe:这个同步模式已经非常出名了:它是Seqlock的单写者变体。 - Peter Cordes
1个回答

3

当针对x86时,编译时重排序仍然可能会影响程序,因为编译器优化的是保留程序在C++抽象机器上的行为,而不是任何更强的依赖于体系结构的行为。由于我们想避免memory_order_seq_cst,所以可以进行重排序。

是的,您的存储可以按照您的建议进行重新排序。您的加载也可以与t2加载进行重新排序,因为获取加载只是单向屏障。编译器可以完全优化掉t2检查,这是合法的。如果可能发生重排序,编译器可以决定这是始终发生的,并应用as-if规则来生成更有效的代码。(当前编译器通常不会,但根据当前标准的写法,这绝对是被允许的。请参见关于此问题的讨论结论,以及标准提案的链接。)

防止重新排序的选项包括:

  • 使用release和acquire语义,使所有数据成员的存储/加载都是原子性的。(最后一个数据成员的acquire-load将保持t2加载不会先执行。)
  • 使用屏障(也称为栅栏)来对所有非原子性存储和非原子性加载进行排序。

    正如Jeff Preshing所解释的那样,mo_release fence不是mo_release存储, 这是我们需要的双向屏障类型。std::atomic只是回收std::mo_名称,而不是为栅栏提供不同的名称。

    (顺便说一句,非原子性存储/加载应该使用mo_relaxed原子性,因为在它们可能正在被重写的过程中读取它们实际上是未定义的行为,即使您决定不查看读取的内容。)


void produce(int64_t n, ...) // ... for above data members
{
    /*********** changed lines ************/
    std::atomic_signal_fence(std::memory_order_release);  // compiler-barrier to make sure the compiler does the seq store as late as possible (to give the reader more time with it valid).
    s.seq.store(n-1, std::memory_order_relaxed);          // changed from release
    std::atomic_thread_fence(std::memory_order_release);  // StoreStore barrier prevents reordering of the above store with any below stores.  (It's also a LoadStore barrier)
    /*********** end of changes ***********/

    // assign data members of s
    ...

    // release semantics prevent any preceding stores from being delayed past here
    s.seq.store(n, std::memory_order_release); // complete updating
}



bool consume(int64_t n, ...) // ... for interested fields passed as reference
{
    if (n == s.seq.load(std::memory_order_acquire))
    {
        // acquire semantics prevent any reordering with following loads

        // read fields
        ...

    /*********** changed lines ************/
        std::atomic_thread_fence(std::memory_order_acquire);  // LoadLoad barrier (and LoadStore)
        auto t2 = s.seq.load(std::memory_order_relaxed);    // relaxed: it's ordered by the fence and doesn't need anything extra
        // std::atomic_signal_fence(std::memory_order_acquire);  // compiler barrier: probably not useful on the load side.
    /*********** end of changes ***********/
        if (n == t2)
            return true;
    }

    return false;
}

注意额外的编译器屏障(signal_fence仅影响编译时重排序),以确保编译器不会将一次迭代中的第二个存储与下一次迭代中的第一个存储合并,如果在循环中运行此代码。或者更一般地说,为了确保使失效该区域的存储尽可能晚地完成,以减少误报。(对于真正的编译器和在调用此函数之间有大量代码的情况,可能不需要这样做。但是signal_fence永远不会编译成任何指令,并且似乎比将第一个存储保留为mo_release更好的选择。在释放存储线程栅栏都编译为额外指令的体系结构上,松散存储避免了具有两个单独的障碍指令。)
我还担心第一个存储与上一次迭代的释放存储重新排序的可能性。但我认为这永远不会发生,因为两个存储都是针对同一地址的。(在编译时,也许标准允许敌对编译器这样做,但是任何明智的编译器都不会这样做,而是根本不执行其中的一个存储,如果它认为可以通过另一个存储。)在弱序体系结构上的运行时,我不确定针对同一地址的存储是否可能以全局可见的顺序出现。这在实际生活中不应该成为问题,因为生产者可能不会连续调用。

顺便提一下,您正在使用的同步技术是Seqlock,但只有一个写入者。您仅具有序列部分,而没有锁定部分以同步单独的写入者。在多个写入者版本中,写入者将在读取/写入序列号和数据之前获取锁定。(并且不使用函数参数作为序列号,而是从锁中读取)。

C++标准讨论文件N4455(关于原子性编译优化,请参见我在Can num++ be atomic for 'int num'?上的答案的后半部分)将其用作示例。

他们在写入器中对数据项使用release-stores,而不是StoreStore栅栏。(需要使用原子数据项,正如我所提到的这样才能真正正确无误)。

void writer(T d1, T d2) {
  unsigned seq0 = seq.load(std::memory_order_relaxed);  // note that they read the current value because it's presumably a multiple-writers implementation.
  seq.store(seq0 + 1, std::memory_order_relaxed);
  data1.store(d1, std::memory_order_release);
  data2.store(d2, std::memory_order_release);
  seq.store(seq0 + 2, std::memory_order_release);
}

他们谈论让读者的第二个序列号负载有可能与后续操作重新排序,如果编译器这样做是有利的,并在读者中使用t2 = seq.fetch_add(0, std::memory_order_release)作为潜在的获取具有释放语义的负载的方法。对于当前的编译器,我不建议这样做;在x86上,你很可能会得到一个被锁定的操作,而我上面建议的方法没有任何(或任何实际的障碍指令,因为只有完全障碍seq_cst栅栏需要在x86上使用指令)。


@cmaster:你认为release-store是一个双向的StoreStore屏障,可以对所有其他存储进行排序。但实际上并不是这样的,正如Jeff Preshing在这篇文章中所解释的那样,release-store和独立的atomic_thread_fence(mo_release)之间存在区别,后者才是一个屏障。众所周知,release-stores只是单向屏障,acquire-loads也是如此,Jeff的文章链接到了一些其他提到这一点的文章/演讲。 - Peter Cordes
哎呀,是的,你说得对。抱歉,我刚才删除了我的评论。不幸的是,我现在无法取消我的反对票 :-( - cmaster - reinstate monica
@cmaster:我发现了一个需要修正的编辑错误,并在第一段进行了扩展,也许可以防止未来读者产生与你相同的“这听起来不对”的印象。所以现在你可以修改你的投票 :) - Peter Cordes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接