无需复制且线程安全的大数组环形缓冲区

5
针对大数组(10^7元素)的信号处理,我使用不同的线程连接环形缓冲区。不幸的是,太多的时间都用于将数据复制到和复制出缓冲区。当前实现基于boost::lockfree::spsc_queue。
因此,我正在寻找一种解决方案,通过使用unique_ptr来交换向量在线程和缓冲区之间的所有权(请参见附加的图纸:在线程和队列之间交换指针)。
移动智能指针并不符合我的需求,因为这样我需要不断在运行时为新的向量元素分配内存。这个开销比数据传输大得多。
在这个设计中,我是否忽略了一些缺陷?
是否有线程安全或甚至无锁的环形缓冲区实现,允许推入和弹出的交换操作?
编辑:我修改了一个锁定的环形缓冲区来交换 unique_ptr。性能提升很大。虽然它不像一个优雅的解决方案。有什么建议吗?
// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/circular_buffer.cpp

#include <memory>
#include <mutex>

template <typename T, int SIZE>
class RingbufferPointer {
typedef std::unique_ptr<T> TPointer;
public:
    explicit RingbufferPointer() {
        // create objects
        for (int i=0; i<SIZE; i++) {
            buf_[i] = std::make_unique<T>();
        }
    }

    bool push(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (full())
            return false;

        std::swap(buf_[head_], item);

        if (full_)
            tail_ = (tail_ + 1) % max_size_;

        head_ = (head_ + 1) % max_size_;
        full_ = head_ == tail_;

        return true;
    }

    bool pop(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (empty())
            return false;

        std::swap(buf_[tail_], item);

        full_ = false;
        tail_ = (tail_ + 1) % max_size_;

        return true;
    }

    void reset() {
        std::lock_guard<std::mutex> lock(mutex_);
        head_ = tail_;
        full_ = false;
    }

    bool empty() const {
        return (!full_ && (head_ == tail_));
    }

    bool full() const {
        return full_;
    }

    int capacity() const {
        return max_size_;
    }

    int size() const {
        int size = max_size_;

        if(!full_) {
            if(head_ >= tail_)
                size = head_ - tail_;
            else
                size = max_size_ + head_ - tail_;
        }

        return size;
    }

private:
    TPointer buf_[SIZE];

    std::mutex mutex_;
    int head_ = 0;
    int tail_ = 0;
    const int max_size_ = SIZE;
    bool full_ = 0;
};

1
提到的boost队列在推入和弹出元素方面非常严格,据我所知不允许获取一个元素、处理它并释放它以进行覆盖。也许你将池化方法与Mike的回答混淆了。https://dev59.com/bq7la4cB1Zd3GeqPZDI4#52204403 - JackGrinningCat
我不明白你为什么需要在循环缓冲区内交换指针或元素。你是单生产者和消费者吗?如果是,为什么生产者不能直接读取数据到写入位置,而消费者则直接从读取位置处理数据呢? - RbMm
你是否需要按照FIFO顺序精确处理项目?无论如何,这可以通过无锁代码完成,而不需要任何互斥体。 - RbMm
@RdMm:是的,只有SPSC并按FIFO顺序。你说的直接写入FIFO是什么意思?这样能保证线程安全吗?如果消费者在写入时想要访问写入位置会发生什么?我该如何阻止该访问但仍获得线程安全甚至无锁队列? - luxderfux
@luxderfux - 理解了,在FIFO顺序中可能实现SPSC。线程安全且无锁,不需要任何互斥体。 - RbMm
4个回答

1

如果我正确理解你的任务 - 你需要2个容器:

  • 线程安全且无锁的自由元素池 - 以避免每次分配/释放。推送和弹出是无等待的。
  • 线程安全且无锁的单写/单读 FIFO 队列,推送和弹出是无等待的。

这样你就可以做到:

  • 在开始时,你分配 N 个元素并将其推入池中。
  • 生产者从池中弹出空闲项(而不是分配内存)
  • 准备项目数据
  • 将其推送到 FIFO 队列中
  • 如果池中没有可用的空闲项 - 等待消费者信号

  • 消费者从 FIFO 队列中弹出项目
  • 处理项目数据
  • 将项目推回池中(而不是释放其内存)
  • 如果队列为空-等待生产者发出信号

FIFO队列可以这样实现:

FIFO队列可以按照以下方式实现:

class CyclicBufer
{
    struct alignas(8) Position 
    {
        ULONG _begin, _data_size;
    };

    std::atomic<Position> _pos;
    void** _items;

    ULONG _buf_size;

public:

    // Requires: only one thread is allowed to push data to the CyclicBufer
    bool push(void* item, bool* bWasEmpty = 0);

    // Requires: only one thread is allowed to pop data to the CyclicBufer
    bool pop(void** pitem, bool* bNotEmpty = 0);

    ~CyclicBufer()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    CyclicBufer() : _items(0), _buf_size(0)
    {
        _pos._My_val._begin = 0, _pos._My_val._data_size = 0;
    }

    bool create(ULONG buf_size)
    {
        if (_items = new(std::nothrow) void*[buf_size])
        {
            _buf_size = buf_size;
            return true;
        }

        return false;
    }

    bool is_empty()
    {
        Position current_pos = _pos.load(std::memory_order_relaxed);

        return !current_pos._data_size;
    }
};

bool CyclicBufer::push(void* item, bool* bWasEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_relaxed);

    if (current_pos._data_size >= _buf_size) return false;

    // (_pos._begin + _pos._data_size) % _buf_size never changed in pop
    _items[(current_pos._begin + current_pos._data_size) % _buf_size] = item;

    for (;;)
    {
        Position new_pos = {
            current_pos._begin, current_pos._data_size + 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_release))
        {
            if (bWasEmpty) *bWasEmpty = current_pos._data_size == 0;
            return true;
        }
    }
}

bool CyclicBufer::pop(void** pitem, bool* bNotEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_acquire);

    if (!current_pos._data_size) return false;

    // current_pos._begin never changed in push
    void* item = _items[current_pos._begin];

    for (;;)
    {
        Position new_pos = {
            (current_pos._begin + 1) % _buf_size, current_pos._data_size - 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_relaxed))
        {
            if (bNotEmpty) *bNotEmpty = new_pos._data_size != 0;
            *pitem = item;
            return true;
        }
    }
}

在Windows上实现线程安全和无锁池,可以使用实现中的InterlockedPushEntrySListInterlockedPopEntrySList,但当然也可以自己实现此API:

struct list_entry {
    list_entry *Next;
};

#if defined(_M_X64) || defined(_M_ARM64)
#define MACHINE_64
#endif

struct alignas(sizeof(PVOID)*2) list_head 
{  
    union {
        struct {
            INT_PTR DepthAndSequence;
            union {
                list_entry* NextEntry;
                INT_PTR iNextEntry;
            };
        };
        __int64 value; // for 32-bit only
    };

    void init()
    {
        iNextEntry = 0, DepthAndSequence = 0;
    }

    bool push(list_entry* entry)
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            entry->Next = current.NextEntry;
            new_head.NextEntry = entry;
            new_head.DepthAndSequence = current.DepthAndSequence + 0x10001;

#ifdef MACHINE_64
            if (_INTRIN_RELEASE(_InterlockedCompareExchange128)(
                &DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                // return is list was empty before push
                return !current.NextEntry;
            }
#else
            new_head.value = _INTRIN_RELEASE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                // return is list was empty before push
                return !current.NextEntry;
            }

            current.value = new_head.value;
#endif
        }
    }

    list_entry* pop()
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            list_entry* entry = current.NextEntry;

            if (!entry)
            {
                return 0;
            }

            // entry must be valid memory
            new_head.NextEntry = entry->Next;
            new_head.DepthAndSequence = current.DepthAndSequence - 1;

#ifdef MACHINE_64
            if (_INTRIN_ACQUIRE(_InterlockedCompareExchange128)(&DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                return entry;
            }
#else
            new_head.value = _INTRIN_ACQUIRE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                return entry;
            }

            current.value = new_head.value;
#endif
        }
    }
};

#pragma warning(disable : 4324)

template <class _Ty>
class FreeItems : list_head
{
    void* _items;

    union Chunk {
        list_entry entry;
        char buf[sizeof(_Ty)];
    };

public:

    ~FreeItems()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    FreeItems() : _items(0)
    {
        init();
    }

    bool create(ULONG count)
    {
        if (Chunk* items = new(std::nothrow) Chunk[count])
        {
            _items = items;

            union {
                list_entry* entry;
                Chunk* item;
            };

            item = items;

            do 
            {
                list_head::push(entry);

            } while (item++, --count);

            return true;
        }

        return false;
    }

    _Ty* pop()
    {
        return (_Ty*)list_head::pop();
    }

    bool push(_Ty* item)
    {
        return list_head::push((list_entry*)item);
    }
};

使用这两个容器的演示/测试代码可以看起来像这样(Windows的代码,但主要是我们如何使用池和队列)。
struct BigData 
{
    ULONG _id;
};

struct CPData : CyclicBufer, FreeItems<BigData>
{
    HANDLE _hDataEvent, _hFreeEvent, _hConsumerStop, _hProducerStop;

    ULONG _waitReadId, _writeId, _setFreeCount, _setDataCount;

    std::_Atomic_integral_t _dwRefCount;
    bool _bStop;

    static ULONG WINAPI sProducer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Producer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Producer()
    {
        HANDLE Handles[] = { _hProducerStop, _hFreeEvent  };

        for (;;)
        {
            BigData* item;

            while (!_bStop && (item = FreeItems::pop()))
            {
                // init data item
                item->_id = _writeId++;

                bool bWasEmpty;

                if (!CyclicBufer::push(item, &bWasEmpty)) __debugbreak();

                if (bWasEmpty)
                {
                    _setDataCount++;
                    SetEvent(_hDataEvent);
                }
            }

            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                SetEvent(_hConsumerStop);
                return;
            case WAIT_OBJECT_0 + 1:
                break;
            default:
                __debugbreak();
            }
        }
    }

    static ULONG WINAPI sConsumer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Consumer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Consumer()
    {
        HANDLE Handles[] = { _hDataEvent, _hConsumerStop };

        for (;;)
        {
            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                break;
            case WAIT_OBJECT_0 + 1:
                return;
            default:
                __debugbreak();
            }

            bool bNotEmpty;

            do 
            {
                BigData* item;

                if (!CyclicBufer::pop((void**)&item, &bNotEmpty)) __debugbreak();

                // check FIFO order
                if (item->_id != _waitReadId) __debugbreak();

                _waitReadId++;

                // process item

                // free item to the pool
                if (FreeItems::push(item))
                {
                    // stack was empty
                    _setFreeCount++;
                    SetEvent(_hFreeEvent);
                }

            } while (bNotEmpty);
        }
    }

    ~CPData()
    {
        if (_hConsumerStop) CloseHandle(_hConsumerStop);
        if (_hProducerStop) CloseHandle(_hProducerStop);
        if (_hFreeEvent) CloseHandle(_hFreeEvent);
        if (_hDataEvent) CloseHandle(_hDataEvent);

        if (_waitReadId != _writeId || !CyclicBufer::is_empty()) __debugbreak();

        DbgPrint("%s(%u %u %u)\n", __FUNCTION__, _writeId, _setFreeCount, _setDataCount);
    }

public:

    CPData()
    {
        _hFreeEvent = 0, _hDataEvent = 0, _hProducerStop = 0, _hConsumerStop = 0;
        _waitReadId = 0, _writeId = 0, _dwRefCount = 1;
        _setFreeCount = 0, _setDataCount = 0, _bStop = false;
    }

    void AddRef()
    {
        _MT_INCR(_dwRefCount);
    }

    void Release()
    {
        if (!_MT_DECR(_dwRefCount))
        {
            delete this;
        }
    }

    ULONG Create(ULONG n)
    {
        if (!CyclicBufer::create(n) || !FreeItems::create(n))
        {
            return ERROR_NO_SYSTEM_RESOURCES;
        }

        return (_hDataEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hFreeEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hProducerStop = CreateEvent(0, TRUE, FALSE, 0)) &&
            (_hConsumerStop = CreateEvent(0, TRUE, FALSE, 0)) ? 0 : GetLastError();
    }

    ULONG StartThread(bool bConsumer)
    {
        AddRef();

        if (HANDLE hThread = CreateThread(0, 0, bConsumer ? sConsumer : sProducer, this, 0, 0))
        {
            CloseHandle(hThread);
            return 0;
        }

        Release();

        return GetLastError();
    }

    ULONG Stop()
    {
        ULONG err = SetEvent(_hProducerStop) ? 0 : GetLastError();
        _bStop = true;
        return err;
    }
};

void BufTest()
{
    if (CPData* p = new CPData)
    {
        if (!p->Create(16))
        {
            if (!p->StartThread(false))
            {
                p->StartThread(true);
            }

            MessageBoxW(0, 0, L"Wait Stop", MB_ICONINFORMATION);
            p->Stop();
        }
        p->Release();
    }
    MessageBoxW(0,0,0,1);
}

1
移动智能指针不符合我的需求,因为这样我需要在运行时不断分配内存用于新的向量元素。
如果你预先分配足够的存储空间并实现自己的内存管理(如简单分离存储或池化),那么就不一定需要这样做。如果你这样做了,就可以随意交换,而且可以使用任何支持元素交换的环形缓冲区,保持与之前相同的线程安全性。你可以考虑只使用boost::pool而不是自己实现。

使用向量,我在构造函数中设置大小,如下所示: boost::object_pool<std::vector<float>> pool{10000000, 0}; std::vector<float> *vec = pool.construct(1000000);或者我误解了这个概念? - luxderfux
据我所理解:池化可以帮助我预分配内存,并在运行时更快地访问它。但是,由于向量的大小调整,类成员的初始化仍然是一项昂贵的任务。也许可以将对象保持在某种容器中,该容器允许借用和归还这些对象。或者是否有其他快速初始化向量的方法? - luxderfux
我所说的池化(pooling)就是这个意思:你永远不会删除任何东西,只是将其返回到池中,以供下一个请求使用(获取相同的物理、预分配和已初始化的对象)。如果您想让我详细说明,请告诉我,我会将其添加到原始答案中。 - Geezer
@luxderfux 抱歉我之前在讨论中回复了你,但是不确定这个怎么操作...我的评论是:好的,那么我需要对你的情况有一个清晰的理解才能这样做:根据你的图示,你的生产者将整个大向量推入队列中,直到消费者完全弹出它,对吗?如果是这样,那么你要交换的对象就是向量本身,对吗?当你谈论类成员的初始化时,你是指每个向量元素的成员吗? - Geezer
谢谢您的提问。大向量是最简单的情况。最终,我想要传递封装大数组的对象。我成功地构建了一个快速而简单的池容器,目前满足我的需求。所以感谢您将队列与池相结合的想法。看起来很有效 :) - luxderfux
显示剩余5条评论

0

尽管 boost::lockfree::spsc_queue 不支持移动操作,但您仍然可以实现。

以下是将向量移入和从队列中移出的示例:

struct Element {
    std::vector<int> data_;

    Element(std::vector<int>& data)
        : data_(std::move(data))
    {}

    Element(Element const&) = delete;
    Element operator=(Element const&) = delete;

    operator std::vector<int>&&() {
        return std::move(data_);
    }
};

int main() {
    boost::lockfree::spsc_queue<Element, boost::lockfree::capacity<2>> q;

    std::vector<int> a(1);
    assert(!a.empty());
    q.push(&a, &a + 1); // Move the vector into the queue.
    assert(a.empty());

    std::vector<int> b = q.front(); // Move the vector from queue.
    assert(!b.empty());
    q.pop();
}

-1

我使用过的一种技巧是...

void next_step(std::vector<std::string> &a)
{
    std::vector<std::string> v;
    v.swap(a);
    // process vector ...
}

不要交换或复制单个元素。 快速高效。

Mike


但是他提到他之前没有时间创建那些对象。即使他交换了资源,他也无法在最后将资源交换回进程。 - JackGrinningCat
@JackGrinningCat 这种技术避免了创建除空向量以外的任何东西。这几乎没有开销。除非我漏掉了什么? - Michael Surette
你确定吗?从A获取的内存将在(线程2)的下一步中释放,并且如果没有池,则必须在线程1中获取。如果不是在线程2的末尾,至少也要在信号处理链的末尾获取。 - JackGrinningCat
1
@JackGrinningCat 当然,处理的一部分是以类似的方式将向量直接传递给下一步,或通过现有的环形缓冲系统间接传递。 - Michael Surette
std::vector::swap 不是线程安全/无锁的。 std::vector 实现通常有 3 个指针(开始/结束/已使用),因此只有具有 32 位指针的 x86-64(例如 x32 ABI)才能使用 16 字节的 cmpxchg16b 进行无锁交换,以提供一个原子替换所有成员的方法。 - Peter Cordes
即使使用互斥保护,交换速度仍然非常快,与其他替代方案相比,交换时间是微不足道的。 - Michael Surette

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接