我读得越多就越糊涂...我本以为在C++中找到一个正式正确的MPSC队列是微不足道的。
每次我发现另一种尝试,进一步的研究似乎表明存在ABA或其他微妙的竞争条件。
许多人谈论需要垃圾回收。这是我想避免的事情。
是否有一个被广泛接受的正确的开源实现?
我读得越多就越糊涂...我本以为在C++中找到一个正式正确的MPSC队列是微不足道的。
每次我发现另一种尝试,进一步的研究似乎表明存在ABA或其他微妙的竞争条件。
许多人谈论需要垃圾回收。这是我想避免的事情。
是否有一个被广泛接受的正确的开源实现?
post_done
中的写入线程之间,这是为了保证 FIFO。计数器(head_,wrtn_,rdng_,tail_)被选中以确保正确性和 FIFO,因此放弃 FIFO 也需要更改计数器(这可能难以在不牺牲正确性的情况下实现)。对于只有一个消费者的场景,可以稍微提高性能,但我不会费力去做这件事 - 如果发现其他具有多个读取器的用例,则必须撤消它。 total=1000000 samples, avg=0.24us
50%=0.214us, avg=0.093us
90%=0.23us, avg=0.151us
99%=0.322us, avg=0.159us
99.9%=15.566us, avg=0.173us
acquire...
函数之一),由于上下文切换,平均延迟会增加约1-2微秒。 total=1500000 samples, avg=0.07us
50%=0us, avg=0us
90%=0.155us, avg=0.016us
99%=0.361us, avg=0.038us
99.9%=8.723us, avg=0.044us
// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <core/api.hxx>
#include <core/wheel/exception.hxx>
#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>
namespace core { namespace wheel
{
struct bad_size : core::exception
{
template<typename T> explicit bad_size(const T&, size_t m)
: core::exception(std::string("Slot capacity exceeded, sizeof(")
+ typeid(T).name()
+ ") = "
+ boost::lexical_cast<std::string>(sizeof(T))
+ ", capacity = "
+ boost::lexical_cast<std::string>(m)
)
{}
};
// inspired by Disruptor
template <typename Header>
class wheel : boost::noncopyable
{
__declspec(align(64))
struct slot_detail
{
// slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
// slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel)
// done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
template <bool Writing>
void done(wheel* w)
{
if (Writing)
w->post_done(sequence);
else
w->read_done();
}
// cache line for sequence number and header
long long sequence;
Header header;
// there is no such thing as data type with variable size, but we need it to avoid thrashing
// cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
// This is well into UB territory! Using template parameter for this is not good, since it
// results in this small implementation detail leaking to all possible user interfaces.
__declspec(align(8))
char data[8];
};
// use this as a storage space for slot_detail, to guarantee 64 byte alignment
_declspec(align(64))
struct slot_block { long long padding[8]; };
public:
// wrap slot data to outside world
template <bool Writable>
class slot
{
template<typename> friend class wheel;
slot& operator=(const slot&); // moveable but non-assignable
// may only be constructed by wheel
slot(slot_detail* impl, wheel<Header>* w, size_t c)
: slot_(impl) , wheel_(w) , capacity_(c)
{}
public:
slot(slot&& s)
: slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
{
s.slot_ = NULL;
}
~slot()
{
if (slot_)
{
slot_->done<Writable>(wheel_);
}
}
// slot accessors - use Header to store information on what type is actually stored in data
bool empty() const { return !slot_; }
long long sequence() const { return slot_->sequence; }
Header& header() { return slot_->header; }
char* data() { return slot_->data; }
template <typename T> T& cast()
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
if (empty())
throw no_data();
return *((T*) data());
}
private:
slot_detail* slot_;
wheel<Header>* wheel_;
const size_t capacity_;
};
private:
// dynamic size of slot, with extra capacity, expressed in 64 byte blocks
static size_t sizeof_slot(size_t s)
{
size_t m = sizeof(slot_detail);
// add capacity less 8 bytes already within sizeof(slot_detail)
m += max(8, s) - 8;
// round up to 64 bytes, i.e. alignment of slot_detail
size_t r = m & ~(unsigned int)63;
if (r < m)
r += 64;
r /= 64;
return r;
}
// calculate actual slot capacity back from number of 64 byte blocks
static size_t slot_capacity(size_t s)
{
return s*64 - sizeof(slot_detail) + 8;
}
// round up to power of 2
static size_t round_size(size_t s)
{
// enfore minimum size
if (s <= min_size)
return min_size;
// find rounded value
--s;
size_t r = 1;
while (s)
{
s >>= 1;
r <<= 1;
};
return r;
}
slot_detail& at(long long sequence)
{
// find index from sequence number and return slot at found index of the wheel
return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
}
public:
wheel(size_t capacity, size_t size)
: head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
, blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
{
static_assert(boost::is_pod<Header>::value, "Header type must be POD");
static_assert(sizeof(slot_block) == 64, "This was unexpected");
wheel_ = new slot_block[size_ * blocks_];
// all slots must be initialised to 0
memset(wheel_, 0, size_ * 64 * blocks_);
active_ = 1;
}
~wheel()
{
stop();
delete[] wheel_;
}
// all accessors needed
size_t capacity() const { return capacity_; } // capacity of a single slot
size_t size() const { return size_; } // number of slots available
size_t queue() const { return (size_t)head_ - (size_t)tail_; }
bool active() const { return active_ == 1; }
// enough to call it just once, to fine tune slot capacity
template <typename T>
void check() const
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
}
// stop the wheel - safe to execute many times
size_t stop()
{
InterlockedExchange(&active_, 0);
// must wait for current read to complete
while (rdng_ != tail_)
Sleep(10);
return size_t(head_ - tail_);
}
// return first available slot for write
slot<true> post()
{
if (!active_)
throw stopped();
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
throw overflowing();
// protection against case of race condition when we are overflowing
// and two or more threads try to post and two or more messages are read,
// all at the same time. If this happens we must re-try, otherwise we
// could have skipped a sequence number - causing infinite wait in post_done
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// return first available slot for write, nothrow variant
slot<true> post(std::nothrow_t)
{
if (!active_)
return slot<true>(NULL, this, capacity_);
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
return slot<true>(NULL, this, capacity_);
// must retry if race condition described above
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// read first available slot for read
slot<false> read()
{
slot_detail* r = NULL;
// compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
if (active_ && rdng_ < wrtn_)
{
// the only memory barrier on reading seq. number we need
const long long h = InterlockedIncrement64(&rdng_);
// check if this slot has been written, step back if not
if (h > wrtn_)
InterlockedDecrement64(&rdng_);
else
r = &at(h);
}
// wrap in readable slot
return slot<false>(r , this, capacity_);
}
// waiting for new post, to be used by non-polling clients
void acquire()
{
event_.acquire();
}
bool try_acquire()
{
return event_.try_acquire();
}
bool try_acquire(unsigned long timeout)
{
return event_.try_acquire(timeout);
}
void release()
{}
private:
void post_done(long long sequence)
{
const long long t = sequence - 1;
// the only memory barrier on written seq. number we need
while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
Sleep(0);
// this is outside of critical path for polling clients
event_.set();
}
void read_done()
{
// the only memory barrier on tail seq. number we need
InterlockedIncrement64(&tail_);
}
// each in its own cache line
// head_ - wrtn_ = no. of messages being written at this moment
// rdng_ - tail_ = no. of messages being read at the moment
// head_ - tail_ = no. of messages to read (including those being written and read)
// wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
__declspec(align(64)) volatile long long head_; // currently writing or written
__declspec(align(64)) volatile long long wrtn_; // written
__declspec(align(64)) volatile long long rdng_; // currently reading or read
__declspec(align(64)) volatile long long tail_; // read
__declspec(align(64)) volatile long active_; // flag switched to 0 when stopped
__declspec(align(64))
api::event event_; // set when new message is posted
const size_t blocks_; // number of 64-byte blocks in a single slot_detail
const size_t capacity_; // capacity of data() section per single slot. Initialisation depends on blocks_
const size_t size_; // number of slots available, always power of 2
slot_block* wheel_;
};
}}
以下是轮询消费者工作线程的示例:
while (wheel.active())
{
core::wheel::wheel<int>::slot<false> slot = wheel.read();
if (!slot.empty())
{
Data& d = slot.cast<Data>();
// do work
}
// uncomment below for waiting consumer, saving CPU cycles
// else
// wheel.try_acquire(10);
}
编辑 添加了消费者示例
最适合的实现取决于队列所需的属性。它应该是无界的还是有界的都可以?它应该是 线性化的,或者要求不那么严格也可以?你需要多强的FIFO保证?你愿意支付消费者恢复列表的成本吗(存在一种非常简单的实现,消费者抓住单链表的尾部,从而立即获取生产者放置的所有项目)?它是否保证没有线程被阻塞,或者有微小的机会让某些线程被阻塞也可以?等等。
一些有用的链接:
在无锁设置中是否可能是多生产者,单消费者?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3
希望这能帮到你。
struct task {
boost::function<void()> func;
task* next;
};
boost::mutex task_ready_mutex;
boost::condition_variable task_ready;
boost::atomic<task*> task_in_queue;
// this can be called from any thread
void thread::post_task( task* t ) {
// atomically post the task to the queue.
task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
do { t->next = stale_head;
} while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
// Because only one thread can post the 'first task', only that thread will attempt
// to aquire the lock and therefore there should be no contention on this lock except
// when *this thread is about to block on a wait condition.
if( !stale_head ) {
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
task_ready.notify_one();
}
}
// this is the consumer thread.
void process_tasks() {
while( !done ) {
// this will atomically pop everything that has been posted so far.
pending = task_in_queue.exchange(0,boost::memory_order_consume);
// pending is a linked list in 'reverse post order', so process them
// from tail to head if you want to maintain order.
if( !pending ) { // lock scope
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
// check one last time while holding the lock before blocking.
if( !task_in_queue ) task_ready.wait( lock );
}
}