C++中是否存在多生产者单消费者无锁队列?

30

我读得越多就越糊涂...我本以为在C++中找到一个正式正确的MPSC队列是微不足道的。

每次我发现另一种尝试,进一步的研究似乎表明存在ABA或其他微妙的竞争条件。

许多人谈论需要垃圾回收。这是我想避免的事情。

是否有一个被广泛接受的正确的开源实现?


我的建议是寻找替代方案。对于性能而言,无锁队列并不是很好。当有多个线程写入同一缓存行时,就不可能使任何东西快速运行。为每个生产者使用单独的SPSC队列要快得多。缺点是您会失去项目的顺序。如果您需要一个有序队列并且想要避免很多麻烦,请使用自旋锁。在实践中,它几乎和无锁相当,并且比例如Win32关键部分快数百倍。 - Timo
添加了一个带有distruptor链接的答案。我不知道你是否会喜欢我的实现,它肯定比“真正的东西”简单,但不可移植,并且有几个明显的代码依赖项未附加。最好将其视为上面所述内容的说明 :) - bronekk
@bronekk 谢谢 - 这个周末我一定会仔细研究你的例子! - Steve Lorimer
Boost队列怎么样?http://www.boost.org/doc/libs/1_54_0/doc/html/lockfree/examples.html - Oleg Vazhnev
@javapowered确实,Boost的无锁队列是完美的选择。当我提出这个问题时,它还不存在,但是我现在已经开始使用它了。 - Steve Lorimer
显示剩余6条评论
4个回答

12
您可能希望查看 disruptor;它在这里提供 C++ 版本:http://lmax-exchange.github.io/disruptor/
您还可以在stackoverflow 上找到它的工作原理解释。基本上,它是一个循环缓冲区,没有锁定,专为在线程之间传递 FIFO 消息而优化,且使用固定大小的槽。
以下是我发现有用的两个实现:NatSys Lab. Blog 上的无锁多生产者多消费者队列
CodeProject 上的另一种无锁循环数组队列实现
注意:下面的代码是错误的,我只留下它作为一个例子来说明这些问题有多棘手。 如果您不喜欢 Google 版本的复杂性,请参考以下由我提供的类似版本 - 它要简单得多,但是需要读者自己完成(它是一个较大项目的一部分,目前不可移植)。整个思路是维护数据的循环缓冲区和一小组计数器以标识写入/已写入和读取/已读取的槽。由于每个计数器都在自己的高速缓存行中,并且(通常)每个计数器仅在消息的生命周期内原子更新一次,因此可以无需任何同步即可读取所有计数器。唯一潜在的争用点在 post_done 中的写入线程之间,这是为了保证 FIFO。计数器(head_,wrtn_,rdng_,tail_)被选中以确保正确性和 FIFO,因此放弃 FIFO 也需要更改计数器(这可能难以在不牺牲正确性的情况下实现)。对于只有一个消费者的场景,可以稍微提高性能,但我不会费力去做这件事 - 如果发现其他具有多个读取器的用例,则必须撤消它。
在我的机器上,延迟看起来像以下内容(左侧为百分位数,右侧为该百分位数内的平均值,单位为微秒,由 rdtsc 测量):
    total=1000000 samples, avg=0.24us
    50%=0.214us, avg=0.093us
    90%=0.23us, avg=0.151us
    99%=0.322us, avg=0.159us
    99.9%=15.566us, avg=0.173us

这些结果是针对单个轮询消费者的,即工作线程在紧密循环中调用wheel.read()并检查是否为空(请向下滚动查看示例)。等待消费者(CPU利用率较低)将等待事件(acquire...函数之一),由于上下文切换,平均延迟会增加约1-2微秒。
由于读取操作中争用非常少,所以消费者可以随着工作线程数量的增加而很好地扩展,例如,在我的机器上使用3个线程:
    total=1500000 samples, avg=0.07us
    50%=0us, avg=0us
    90%=0.155us, avg=0.016us
    99%=0.361us, avg=0.038us
    99.9%=8.723us, avg=0.044us

欢迎提交补丁 :)
// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <core/api.hxx>
#include <core/wheel/exception.hxx>

#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>

namespace core { namespace wheel
{
  struct bad_size : core::exception
  {
    template<typename T> explicit bad_size(const T&, size_t m)
      : core::exception(std::string("Slot capacity exceeded, sizeof(")
                  + typeid(T).name()
                  + ") = "
                  + boost::lexical_cast<std::string>(sizeof(T))
                  + ", capacity = "
                  + boost::lexical_cast<std::string>(m)
                  )
    {}
  };        

  // inspired by Disruptor
  template <typename Header>
  class wheel : boost::noncopyable
  {
    __declspec(align(64))
    struct slot_detail
    {
      // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
      // slot read:  (memory barrier in wheel) > read_done > (memory barrier in wheel)

      // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
      template <bool Writing>
      void done(wheel* w)
      {
        if (Writing)
          w->post_done(sequence);
        else
          w->read_done();
      }

      // cache line for sequence number and header
      long long sequence;
      Header header;

      // there is no such thing as data type with variable size, but we need it to avoid thrashing
      // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
      // This is well into UB territory! Using template parameter for this is not good, since it
      // results in this small implementation detail leaking to all possible user interfaces.
      __declspec(align(8))
      char data[8];
    };

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment
    _declspec(align(64))
    struct slot_block { long long padding[8]; };

  public:
    // wrap slot data to outside world
    template <bool Writable>
    class slot
    {
      template<typename> friend class wheel;

      slot& operator=(const slot&); // moveable but non-assignable

      // may only be constructed by wheel
      slot(slot_detail* impl, wheel<Header>* w, size_t c)
        : slot_(impl) , wheel_(w) , capacity_(c)
      {}

    public:
      slot(slot&& s)
        : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
      {
        s.slot_ = NULL;
      }

      ~slot()
      {
        if (slot_)
        {
          slot_->done<Writable>(wheel_);
        }
      }

      // slot accessors - use Header to store information on what type is actually stored in data
      bool empty() const          { return !slot_; }
      long long sequence() const  { return slot_->sequence; }
      Header& header()            { return slot_->header; }
      char* data()                { return slot_->data; }

      template <typename T> T& cast()
      {
        static_assert(boost::is_pod<T>::value, "Data type must be POD");
        if (sizeof(T) > capacity_)
          throw bad_size(T(), capacity_);
        if (empty())
          throw no_data();
        return *((T*) data());
      }

    private:
      slot_detail*    slot_;
      wheel<Header>*  wheel_;
      const size_t    capacity_;
    };

  private:
    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks
    static size_t sizeof_slot(size_t s)
    {
      size_t m = sizeof(slot_detail);
      // add capacity less 8 bytes already within sizeof(slot_detail)
      m += max(8, s) - 8;
      // round up to 64 bytes, i.e. alignment of slot_detail
      size_t r = m & ~(unsigned int)63;
      if (r < m)
        r += 64;
      r /= 64;
      return r;
    }

    // calculate actual slot capacity back from number of 64 byte blocks
    static size_t slot_capacity(size_t s)
    {
      return s*64 - sizeof(slot_detail) + 8;
    }

    // round up to power of 2
    static size_t round_size(size_t s)
    {
      // enfore minimum size
      if (s <= min_size)
        return min_size;

      // find rounded value
      --s;
      size_t r = 1;
      while (s)
      {
        s >>= 1;
        r <<= 1;
      };
      return r;
    }

    slot_detail& at(long long sequence)
    {
      // find index from sequence number and return slot at found index of the wheel
      return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
    }

  public:
    wheel(size_t capacity, size_t size)
      : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
      , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
    {
      static_assert(boost::is_pod<Header>::value, "Header type must be POD");
      static_assert(sizeof(slot_block) == 64, "This was unexpected");

      wheel_ = new slot_block[size_ * blocks_];
      // all slots must be initialised to 0
      memset(wheel_, 0, size_ * 64 * blocks_);
      active_ = 1;
    }

    ~wheel()
    {
      stop();
      delete[] wheel_;
    }

    // all accessors needed
    size_t capacity() const { return capacity_; }   // capacity of a single slot
    size_t size() const     { return size_; }       // number of slots available
    size_t queue() const    { return (size_t)head_ - (size_t)tail_; }
    bool active() const     { return active_ == 1; }

    // enough to call it just once, to fine tune slot capacity
    template <typename T>
    void check() const
    {
      static_assert(boost::is_pod<T>::value, "Data type must be POD");
      if (sizeof(T) > capacity_)
        throw bad_size(T(), capacity_);
    }

    // stop the wheel - safe to execute many times
    size_t stop()
    {
      InterlockedExchange(&active_, 0);
      // must wait for current read to complete
      while (rdng_ != tail_)
        Sleep(10);

      return size_t(head_ - tail_);
    }

    // return first available slot for write
    slot<true> post()
    {
      if (!active_)
        throw stopped();

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          throw overflowing();

        // protection against case of race condition when we are overflowing
        // and two or more threads try to post and two or more messages are read,
        // all at the same time. If this happens we must re-try, otherwise we
        // could have skipped a sequence number - causing infinite wait in post_done
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // return first available slot for write, nothrow variant
    slot<true> post(std::nothrow_t)
    {
      if (!active_)
        return slot<true>(NULL, this, capacity_);

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          return slot<true>(NULL, this, capacity_);

        // must retry if race condition described above
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // read first available slot for read
    slot<false> read()
    {
      slot_detail* r = NULL;
      // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
      if (active_ && rdng_ < wrtn_)
      {
        // the only memory barrier on reading seq. number we need
        const long long h = InterlockedIncrement64(&rdng_);
        // check if this slot has been written, step back if not
        if (h > wrtn_)
          InterlockedDecrement64(&rdng_);
        else
          r = &at(h);
      }

      // wrap in readable slot
      return slot<false>(r , this, capacity_);
    }

    // waiting for new post, to be used by non-polling clients
    void acquire()
    {
      event_.acquire();
    }

    bool try_acquire()
    {
      return event_.try_acquire();
    }

    bool try_acquire(unsigned long timeout)
    {
      return event_.try_acquire(timeout);
    }

    void release()
    {}

  private:
    void post_done(long long sequence)
    {
      const long long t = sequence - 1;

      // the only memory barrier on written seq. number we need
      while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
        Sleep(0);

      // this is outside of critical path for polling clients
      event_.set();
    }

    void read_done()
    {
      // the only memory barrier on tail seq. number we need
      InterlockedIncrement64(&tail_);
    }

    // each in its own cache line
    // head_ - wrtn_ = no. of messages being written at this moment
    // rdng_ - tail_ = no. of messages being read at the moment
    // head_ - tail_ = no. of messages to read (including those being written and read)
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
    __declspec(align(64)) volatile long long head_; // currently writing or written
    __declspec(align(64)) volatile long long wrtn_; // written
    __declspec(align(64)) volatile long long rdng_; // currently reading or read
    __declspec(align(64)) volatile long long tail_; // read
    __declspec(align(64)) volatile long active_;    // flag switched to 0 when stopped

    __declspec(align(64))
    api::event event_;          // set when new message is posted
    const size_t blocks_;       // number of 64-byte blocks in a single slot_detail
    const size_t capacity_;     // capacity of data() section per single slot. Initialisation depends on blocks_
    const size_t size_;         // number of slots available, always power of 2
    slot_block* wheel_;
  };
}}

以下是轮询消费者工作线程的示例:

  while (wheel.active())
  {
    core::wheel::wheel<int>::slot<false> slot = wheel.read();
    if (!slot.empty())
    {
      Data& d = slot.cast<Data>();
      // do work
    }
    // uncomment below for waiting consumer, saving CPU cycles
    // else
    //   wheel.try_acquire(10);
  }

编辑 添加了消费者示例


请问你能解释一下Header和Data是什么,它们之间有什么区别吗?如果我想在每个槽中存储3个64位字(即整个有效负载),我该如何使用它们? - Steve Lorimer

4

最适合的实现取决于队列所需的属性。它应该是无界的还是有界的都可以?它应该是 线性化的,或者要求不那么严格也可以?你需要多强的FIFO保证?你愿意支付消费者恢复列表的成本吗(存在一种非常简单的实现,消费者抓住单链表的尾部,从而立即获取生产者放置的所有项目)?它是否保证没有线程被阻塞,或者有微小的机会让某些线程被阻塞也可以?等等。

一些有用的链接:
在无锁设置中是否可能是多生产者,单消费者?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3

希望这能帮到你。


1
我基于Dmitry Vyukov的实现创建了一个版本:https://github.com/samanbarghi/MPSCQ/ - Saman Barghi

0
以下是我用于我的协作式多任务/多线程库(MACE)http://bytemaster.github.com/mace/的技术。它有一种好处,即在队列为空时除外,无需加锁。
struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }

我认为应该是 pending = task_in_queue.exchange(0, boost::memory_order_acquire);,因为在 ISO C++11 标准中规定了 29.3.2:“对原子对象 M 执行释放操作的原子操作 A 与对 M 执行获取操作并从由 A 开头的释放序列中的任何副作用取其值的原子操作 B 同步。” - ipapadop
当“队列”不为空时,由于内存分配/回收,我不会说它是无锁的。 - Shmil The Cat

-1
我猜这样的东西不存在 - 即使存在,它要么不可移植,要么不是开源的。
从概念上讲,您正在尝试同时控制两个指针:尾指针和尾指针 -> 下一个指针。通常无法仅使用无锁原语完成此操作。

1
你的猜测是不正确的。生产者只需要移动一下尾巴。你所描述的是一个侵入式队列。在这种情况下,你可以更新tail->next,然后以原子方式移动tail。如果没有成功,就再循环一次。 - edwinc

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接