无锁单生产者/单消费者循环缓冲区

6
我一直在研究一个无锁单生产者/单消费者循环缓冲区,源自这个网站,但我不知道为什么需要特定的内存屏障。我已经仔细阅读了关于内存顺序的标准规则多次,但我还是不明白哪里出了问题。
这个实现中,只有一个唯一的线程可以调用push()函数,另一个唯一的线程可以调用pop()函数。
下面是Producer代码:
bool push(const Element& item)
{       
  const auto current_tail = _tail.load(std::memory_order_relaxed);  //(1)
  const auto next_tail = increment(current_tail);

  if(next_tail != _head.load(std::memory_order_acquire))            //(2)               
  {     
    _array[current_tail] = item;                                    //(3)
    _tail.store(next_tail, std::memory_order_release);              //(4)
    return true;
  }
  return false; // full queue
}

这里是 Consumer 的代码:

bool pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);    //(1)
  if(current_head == _tail.load(std::memory_order_acquire))           //(2)
    return false; // empty queue

  item = _array[current_head];                                       //(3)
  _head.store(increment(current_head), std::memory_order_release);   //(4)
  return true;
}

我明白为什么绝对需要Producer (4)Consumer (2)语句,这是因为我们必须确保所有在Producer释放存储器的(4) released store之前发生的写操作在consumer看到存储的值时都能够被看到。
我也理解为什么需要Consumer (4)语句,这是为了确保在执行Consumer (4)存储操作之前,Consumer (3)加载操作已经完成。 问题
  • 为什么需要使用acquire semantic(而不是relaxed)来执行Producer (2)加载操作?这是为了防止编译时或运行时重新排序Producer (3)或(4)之前的条件吗?

每个问题一个问题。 - πάντα ῥεῖ
@πάνταῥεῖ 完成了 ;) - Qzaac
2个回答

5
我们需要证明:
_array[current_tail] = item; // push(3)

确认 (current_head == current_tail) 后执行

item = _array[current_head]; // pop(3)

已完成。我们只能在从单元格复制数据到项目后再覆盖单元格。

_head.load(std::memory_order_acquire) // push(2)

与...同步

_head.store(increment(current_head), std::memory_order_release);   //pop(4)

通过Release-Acquire排序:

所有在原子存储-release(pop(4))在_head上之前发生的内存写入(pop(3))一旦在_head上完成原子加载-acquire(push(2)),就会成为可见的副作用。

因此,在完成push(2)之后,生产者代码保证可以看到pop(3)的结果。这意味着从_array[current_head]中复制的数据已经被复制到项目中,并且此操作的结果对于完成push(2)后的生产者代码是可见的,因此_array[current_head]已经空闲。

另一方面,根据memory_order_acquire加载描述 - 在此加载之前,当前线程中没有读取或写入(push(3))可以重新排序。因此,在完成push(2)加载后,push(3)将被执行,但此时pop(3)已经完成。

item = _array[current_head];                                        //pop(3)
_head.store(increment(current_head), std::memory_order_release);    //pop(4)
-----
    _head.load(std::memory_order_acquire);                          //push(2)
    _array[current_tail] = item;                                    //push(3)         

感谢详细的答复。但是,在"Producer" 代码的条件中,仍有一些问题困扰着我,这是由于可能存在推测执行,因此行 if(next_tail != _head.load(std::memory_order_acquire)) 可能会被编译为const auto head = _head.load(std::memory_order_acquire); //then here perform the stores if(next_tail == head) { //restore he memory as it was }正如您所看到的,由于推测存储,条件已经被颠倒了,如果调用 pop 函数,它可能会损坏内存。对我来说,获取屏障应该在条件中。 - Qzaac
我想说的是,load-acquire 可能会在条件测试之前执行,因此存储仍然可以在条件之前执行(但仍在屏障之后)。更清楚了吗?这并不意味着你写 if(next_tail != _head.load(std::memory_order_acquire)) 就不能被解释为首先进行加载,然后再进行条件判断。 - Qzaac
@Yoo - 不对。_head.load(std::memory_order_acquire)将在_array[current_tail] = item;之前执行。 - RbMm
@RbBm 我已经编辑了问题,以确保我想要表达的意思更加清晰 :) - Qzaac
@Yoo - 我认为你需要问一个完全不同的问题 - 由于推测执行而存储 - 是否可能存在这样的存储。可能存在具有副作用的推测执行。现在你当前的问题只会掩盖你当前问题的本质。if (x) { y = a } - 是否可能在检查 x 之前执行 y = a - RbMm
显示剩余5条评论

3
内存屏障防止CPU在访问队列结构(这里使用索引实现,但指针同样可行)时重新排序未使用互锁的Element对象访问。
按照您的编号,必须在(2)和(4)之间执行(3),并且内存屏障提供了这一点。
在生产者中,您所询问的(2)与(3)的确切情况可以防止在队列已满时进行投机覆盖有效数据(建议的位置与有效数据重叠)。如果没有屏障,即使条件失败,从生产者线程的角度来看,原始数据也将被恢复,但是中间值可能会暂时对消费者可见。

在生产者中,你所询问的(2)与(3)的确切情况可以防止在队列已满时进行推测性地覆盖有效数据。这是否意味着生产者在执行条件之前可能会进行推测性存储,并在测试失败时恢复原始存储?对于条件(if(next_tail != _head.load(std::memory_order_acquire))),仍然存在CPU先执行加载和屏障,然后检查条件的可能性,因此(3)操作仍然可能在屏障和条件之间重新排序,而我们正试图避免这种情况。我哪里错了? - Qzaac
@Yoo:你说的没错,但由于条件不会阻塞流水线,执行推测也没有任何优势。相比之下,等待内存获取很容易阻塞,因此乱序执行是有意义的。零回报并不等同于正式禁止推测,为了理论上的正确性,我认为内存屏障应该在if的大括号内部而不是控制表达式中。 - Ben Voigt
感谢您详细的回答! - Qzaac
我觉得这个答案更直接地回答了原问题,应该被采纳为最佳答案。 - Pavel Bazant

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接