C++中是否有一个线程安全的非阻塞队列类?
可能是一个基础问题,但我已经有一段时间没有使用C++了...
编辑:删除STL的要求。
C++中是否有一个线程安全的非阻塞队列类?
可能是一个基础问题,但我已经有一段时间没有使用C++了...
编辑:删除STL的要求。
// Be aware that copying this structure has to be done atomically...
template <class T>
struct pointer
{
T *ptr;
uintptr_t tag;
};
如果您想用一些内联汇编包装指令lock cmpxchg{8|16}b
,那么您可以这样编写队列节点:
template <class T>
struct queue_node
{
T value;
pointer<queue_node<T> > next;
};
由于当前的C++标准甚至没有承认线程的存在,因此STL或标准库的任何其他部分肯定都不是线程安全的。
http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
https://github.com/cameron314/concurrentqueue
单生产者 - 单消费者http://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++
dequeue()
将返回一个特殊值,表示“空”,而不是等待队列变得非空。我猜这个答案假设了后者。不知道提问者想问的是哪一个。 - undefinedstd::queue
和boost::mutex
即可。非阻塞意味着不需要任何互斥量、信号量、临界区等。
对于单个生产者和单个消费者,也就是一个线程推送元素,另一个线程弹出元素,在我看来,像下面提供的循环队列就足够了。
这里是代码,以及其他可能性的注释。
// --------- class: TNQueue
//
// Implements a fixed size circular queue which is non-blocking and thread safe provided that
//
// - there is only one producer, that is one thread that pushes into the queue
// - and only one consumer, a thread that pops the queue
// - producer has to handle fullness of the queue (queue will not block)
// - consumer has to handle emptiness of the queue (queue will not block)
//
// Safety: The queue is thread safe if the given conditions are met because the two separate threads changes
// different pointers (push writeIndx and pop readIndx). Although they read both indexes in functions isFull()
// and isEmpty(), the evaluation is "conservative", that is, in worst case works well.
//
// Counting: Counting elements is not possible in a not blocking and still thread safe way. For example
// a counter as member variable would have to be changed by both, push and pop. And a function
// computing the count using writeIndx and readIndx is not achievable, mainly because of the circular
// nature of the queue: the function would need to use module (%) operation which may provide a nonsense
// value if some of the counters change during the computation.
//
// Blocking with a counting semaphore: If blocking is required, that is the producer gets blocked when the queue is full
// and the consumer when the queue is empty. Then it can be implemented efficiently using a counting semaphore
// that reflects the number of elements in the queue. Then push will increase the semaphore (release) and the pop
// decrement it (wait). This solution provides the counter of elements (from the semaphore) and allow using
// the N element of the storage instead of only N-1 (checking pointers is unnecessary)
//
// multiple producers or consumers or both: if is needed to have multiple producers or multiple consumers or both, then
// a mutex for write and a mutex for read would allow that. Having a unique mutex for both operation is quite
// inefficient, it would block in many unnecessary situations, for example the queue is almost empty but a producer
// pushing one elemnt blocks the queue for any reader.
//
// (*) disclaimer: it was difficult for me to find good information about the subject in internet, as an example I had to change
// the code of https://en.wikipedia.org/wiki/Circular_buffer#Circular_buffer_mechanics (so modification
// at 10:30 on 13 May 2023 Elxala, comes from me, that is same source as this code, be aware!)
// Also, as curiosity, a discussion with chatGPT about this queue provide wrong answers from chatGPT (not the version ChatGPT4).
// So, with that said, do not take the code and the notes as the truth but just as my strong opinion.
// If any of these notes requires correction, feedback would be appreciated.
//
template<typename T, int const N>
class TNQueue
{
protected:
int storage [N]; // Note: N-1 is the actual capacity, see isFull method
int writeIndx;
int readIndx;
public:
TNQueue ():
writeIndx (0),
readIndx (0)
{
}
bool isFull ()
{
return (writeIndx + 1) % N == readIndx;
}
bool isEmpty ()
{
return readIndx == writeIndx;
}
bool push (T item)
{
if (isFull ()) return false;
storage[writeIndx] = item;
writeIndx = (writeIndx + 1) % N;
return true;
}
bool pop (T & item)
{
if (isEmpty ()) return false;
item = storage[readIndx];
readIndx = (readIndx + 1) % N;
return true;
}
};
int main ()
{
TNQueue<int, 15> cua;
// basic test
//
int value = 1001;
while (cua.push (value ++));
while (cua.pop (value))
printf ("read %d\n", value);
return 0;
}
boost::lockfree::queue
? - Oleg Vazhnev