我正在使用std::atomic结构进行尝试,并编写了这个无锁多生产者多消费者队列,我在此处附上代码。队列的思想基于两个栈——生产者栈和消费者栈,它们本质上是链接列表结构。列表的节点包含索引,可以读取或写入实际数据的数组。
列表节点应该是互斥的,即指向节点的指针只能存在于生产者或消费者列表中。生产者将尝试从生产者列表获取节点,消费者从消费者列表获取节点。每当生产者或消费者获得节点的指针时,它应该从两个列表中删除,以便没有其他人可以获取它。我使用std::atomic_compare_exchange函数自旋直到弹出节点。
问题是,逻辑可能有问题,或者操作不是原子操作,因为即使有1个生产者和1个消费者,足够长时间后,队列仍会死锁。我注意到的是,如果你断言cell!= cell->m_next,那么断言就会被触发!所以这很可能是一个一直在我面前的问题,而我却没看出来,所以我想请有经验的人提供一些帮助。
谢谢
列表节点应该是互斥的,即指向节点的指针只能存在于生产者或消费者列表中。生产者将尝试从生产者列表获取节点,消费者从消费者列表获取节点。每当生产者或消费者获得节点的指针时,它应该从两个列表中删除,以便没有其他人可以获取它。我使用std::atomic_compare_exchange函数自旋直到弹出节点。
问题是,逻辑可能有问题,或者操作不是原子操作,因为即使有1个生产者和1个消费者,足够长时间后,队列仍会死锁。我注意到的是,如果你断言cell!= cell->m_next,那么断言就会被触发!所以这很可能是一个一直在我面前的问题,而我却没看出来,所以我想请有经验的人提供一些帮助。
谢谢
#ifndef MTQueue_h
#define MTQueue_h
#include <atomic>
template<typename Data, uint64_t queueSize>
class MTQueue
{
public:
MTQueue() : m_produceHead(0), m_consumeHead(0)
{
for(int i=0; i<queueSize-1; ++i)
{
m_nodes[i].m_idx = i;
m_nodes[i].m_next = &m_nodes[i+1];
}
m_nodes[queueSize-1].m_idx = queueSize - 1;
m_nodes[queueSize-1].m_next = NULL;
m_produceHead = m_nodes;
m_consumeHead = NULL;
}
struct CellNode
{
uint64_t m_idx;
CellNode* m_next;
};
bool push(const Data& data)
{
if(m_produceHead == NULL)
return false;
// Pop the producer list.
CellNode* cell = m_produceHead;
while(!std::atomic_compare_exchange_strong(&m_produceHead,
&cell, cell->m_next))
{
cell = m_produceHead;
if(!cell)
return false;
}
// At this point cell should point to a node that is not in any of the lists
m_data[cell->m_idx] = data;
// Push that node as the new head of the consumer list
cell->m_next = m_consumeHead;
while (!std::atomic_compare_exchange_strong(&m_consumeHead,
&cell->m_next, cell))
{
cell->m_next = m_consumeHead;
}
return true;
}
bool pop(Data& data)
{
if(m_consumeHead == NULL)
return false;
// Pop the consumer list
CellNode* cell = m_consumeHead;
while(!std::atomic_compare_exchange_strong(&m_consumeHead,
&cell, cell->m_next))
{
cell = m_consumeHead;
if(!cell)
return false;
}
// At this point cell should point to a node that is not in any of the lists
data = m_data[cell->m_idx];
// Push that node as the new head of the producer list
cell->m_next = m_produceHead;
while(!std::atomic_compare_exchange_strong(&m_produceHead,
&cell->m_next, cell))
{
cell->m_next = m_produceHead;
}
return true;
};
private:
Data m_data[queueSize];
// The nodes for the two lists
CellNode m_nodes[queueSize];
volatile std::atomic<CellNode*> m_produceHead;
volatile std::atomic<CellNode*> m_consumeHead;
};
#endif