C++中的非阻塞线程安全队列是什么?

26

C++中是否有一个线程安全的非阻塞队列类?

可能是一个基础问题,但我已经有一段时间没有使用C++了...

编辑:删除STL的要求。


2
boost::lockfree::queue - Oleg Vazhnev
9个回答

19
假设您的CPU具有双指针宽的比较和交换功能(486或更高版本上的compxchg8b,大多数amd64机器上的compxchg16b [Intel早期型号上不存在])... 这里有一个算法 更新:如果您不怕做一些工作,将其翻译为C++并不难。 :P
此算法假定“带标记的指针”结构如下所示:
// Be aware that copying this structure has to be done atomically...

template <class T>
struct pointer
{
   T *ptr;
   uintptr_t tag;
};

如果您想用一些内联汇编包装指令lock cmpxchg{8|16}b,那么您可以这样编写队列节点:

template <class T>
struct queue_node
{
    T value;
    pointer<queue_node<T> > next;
};

其余部分或多或少是我链接算法的转录...

12
@jldupont说“-1”对此太过严厉了。Asveikau给出了一个很好的算法链接,将其翻译成C++仅需几分钟时间。在这里完全没有必要使用“-1”。 - Rom
1
这就是为什么我没有给他点踩,只是给了他一个标记。如果你看我的个人资料,你会发现我非常慷慨...你不会看到我给很多人点踩。 - jldupont
1
基于危险指针的队列是另一个选择 - 它允许将未使用的内存释放回操作系统,而不必永远在空闲列表中维护它。http://www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf 还可以查看Relacy以检查您的无锁算法是否正确:http://groups.google.com/group/relacy - Greg Rogers
12
如果你并没有真的点踩,为什么要说“-1”呢?这只会让人感到困惑。 :p - jalf
3
现在已经是2013年了,C++到现在为止还没有一个标准的线程安全队列库?有多少开发人员不得不反复编写同样的代码呢? - phreakhead

6

由于当前的C++标准甚至没有承认线程的存在,因此STL或标准库的任何其他部分肯定都不是线程安全的。


1
那好吧,STL 之外的呢? - jldupont
1
标准并不保证线程安全。但是真正的实现可以并且确实会提供自己的保证。 - Jerry Coffin

6

2

2

谢谢提供链接。看起来这会给我的项目增加另一个依赖项:boost库。是吗? - jldupont
3
没错,无论如何,如果你是一个C++开发者,最终都会使用Boost库。Nikolai N Fetissov有关boost::mutex的建议很好。 - KeatsPeeks
5
成为C++开发者并不总是意味着要使用Boost库。我知道这里似乎每个人都喜欢它,但许多项目并没有使用它(甚至禁用它)。看一下Chromium是如何实现的。 - i_am_jorf
政策问题,不多不少。 - KeatsPeeks
条件变量总是与互斥锁一起使用。这与问题要求的非阻塞队列不符。 - Jerry Coffin
@JerryCoffin的网络搜索似乎表明,“非阻塞队列”可以有至少两种不同的含义:(1)在实现中根本没有锁/阻塞,即无锁;(2)在空队列上调用dequeue()将返回一个特殊值,表示“空”,而不是等待队列变得非空。我猜这个答案假设了后者。不知道提问者想问的是哪一个。 - undefined

1

STL容器不是线程安全的,您应该为并发访问实现自己的处理方式。
有这个项目(C ++),旨在提供并发访问:CPH STL
相关论文


1
简短回答 - 不行。STL(标准模板库)不涉及并发(至少在规范层面上)。当前的C++标准也没有关于线程的说明。

不过,你可以很容易地在STL和Boost之上构建这样一个队列 - 只需在自定义类中包装std::queueboost::mutex即可。

Boost通常包含在流行的发行版中,例如Ubuntu、Fedora吗? - jldupont
我认为是这样,至少在我工作的地方,纯净的RHEL安装总是有旧版本的Boost库存在 /usr/include 目录下,这会造成一些问题。 - Nikolai Fetissov
请记住,原始问题是关于非阻塞队列的。互斥锁是一种阻塞的方式,因此不太可能符合要求。 - Jerry Coffin
你可以随时尝试使用try_lock函数并放弃 - 这里不会阻塞 :) - Nikolai Fetissov
“当前的C++标准” - 你能否根据C++11更新你的答案? - Craig McQueen

0

目前非官方的Boost.Lockfree是值得考虑的。我同时使用FIFO和原子类型。


0

非阻塞意味着不需要任何互斥量、信号量、临界区等。

对于单个生产者和单个消费者,也就是一个线程推送元素,另一个线程弹出元素,在我看来,像下面提供的循环队列就足够了。

这里是代码,以及其他可能性的注释。

  // --------- class: TNQueue
  // 
  // Implements a fixed size circular queue which is non-blocking and thread safe provided that
  //
  //    - there is only one producer, that is one thread that pushes into the queue
  //    - and only one consumer, a thread that pops the queue
  //    - producer has to handle fullness of the queue (queue will not block)
  //    - consumer has to handle emptiness of the queue (queue will not block)
  //
  // Safety: The queue is thread safe if the given conditions are met because the two separate threads changes 
  //         different pointers (push writeIndx and pop readIndx). Although they read both indexes in functions isFull()
  //         and isEmpty(), the evaluation is "conservative", that is, in worst case works well.
  // 
  // Counting: Counting elements is not possible in a not blocking and still thread safe way. For example
  //           a counter as member variable would have to be changed by both, push and pop. And a function
  //           computing the count using writeIndx and readIndx is not achievable, mainly because of the circular 
  //           nature of the queue: the function would need to use module (%) operation which may provide a nonsense 
  //           value if some of the counters change during the computation.
  //
  // Blocking with a counting semaphore: If blocking is required, that is the producer gets blocked when the queue is full
  //          and the consumer when the queue is empty. Then it can be implemented efficiently using a counting semaphore
  //          that reflects the number of elements in the queue. Then push will increase the semaphore (release) and the pop
  //          decrement it (wait). This solution provides the counter of elements (from the semaphore) and allow using 
  //          the N element of the storage instead of only N-1 (checking pointers is unnecessary)
  // 
  // multiple producers or consumers or both: if is needed to have multiple producers or multiple consumers or both, then
  //          a mutex for write and a mutex for read would allow that. Having a unique mutex for both operation is quite 
  //          inefficient, it would block in many unnecessary situations, for example the queue is almost empty but a producer
  //          pushing one elemnt blocks the queue for any reader.
  //
  // (*) disclaimer: it was difficult for me to find good information about the subject in internet, as an example I had to change
  //          the code of https://en.wikipedia.org/wiki/Circular_buffer#Circular_buffer_mechanics (so modification
  //          at 10:30 on 13 May 2023 Elxala, comes from me, that is same source as this code, be aware!)
  //          Also, as curiosity, a discussion with chatGPT about this queue provide wrong answers from chatGPT (not the version ChatGPT4).
  //          So, with that said, do not take the code and the notes as the truth but just as my strong opinion.
  //          If any of these notes requires correction, feedback would be appreciated.
  //
  template<typename T, int const N>
  class TNQueue
  {
  protected:
      
      int storage [N];    // Note: N-1 is the actual capacity, see isFull method
      int writeIndx;
      int readIndx;

  public:
     
     TNQueue ():
        writeIndx (0),
        readIndx (0)
     {
     }
     
     bool isFull ()
     {
        return (writeIndx + 1) % N == readIndx;
     }

     bool isEmpty ()
     {
        return readIndx == writeIndx;
     }

     bool push (T item) 
     {
        if (isFull ()) return false;
        
        storage[writeIndx] = item;
        writeIndx = (writeIndx + 1) % N;
        return true;
     }

     bool pop (T & item)
     {
        if (isEmpty ()) return false;
        item = storage[readIndx];
        readIndx = (readIndx + 1) % N;
        return true;
     }
  };

  int main ()
  {
    TNQueue<int, 15> cua;
    
    // basic test
    //
    int value = 1001;
    while (cua.push (value ++));
    while (cua.pop (value))
       printf ("read %d\n", value);
    return 0;
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接