C++11无锁单生产者单消费者:如何避免忙等待

9

我正在尝试实现一个使用两个线程的类:一个用于生产者,另一个用于消费者。当前的实现不使用锁:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

应用程序需要在一定时间内对工作项进行排队,然后休眠等待事件发生。以下是模拟该行为的最小主体:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

我很确定我的实现有错误:如果工作线程完成并在执行working_ = false之前,另一个enqueue到达怎么办?有可能使我的代码线程安全而不使用锁吗?
解决方案需要:
  • 快速入队
  • 即使队列不为空,析构函数也必须退出
  • 没有繁忙等待,因为工作线程处于空闲状态的时间很长
  • 如果可能,不要使用锁
编辑
我根据你的建议对Worker类进行了另一种实现。这是我的第二次尝试:
class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

我在enqueue方法中引入了worker_.join()。这可能会影响性能,但非常罕见(当队列为空并且线程退出之前,另一个enqueue出现)。working_变量现在是一个atomic_flag,在enqueue中设置,在work中清除。working_.clear()后的附加while是必需的,因为如果在clear之前但在while之后推入另一个值,则该值不会被处理。
这个实现是正确的吗?
我进行了一些测试,实现似乎可以工作。
OT:将其作为编辑或答案更好?

1
你为什么反对锁?你希望你的线程被挂起吗?还是你认为锁会降低性能? - Yakk - Adam Nevraumont
@Yakk 我不是完全反对锁。enqueue 是由关键性能代码调用的,所以我试图优化那部分。是的,线程挂起是一个可能的问题。 - mbrt
@michele.bertasi,所以您想让enqueue能够工作并且即使存在优先级反转和/或工作线程已被暂停和/或工作线程永远不会被调度也能够低成本地推进?性能有多关键-每像素每帧操作在4k x 2k缓冲区的性能至关重要,如果图像不能以60 Hz渲染,则有人会死亡吗?还是说“我不想在用户单击UI元素时使他们感到恼怒”?可能介于两者之间,但是在哪里呢? - Yakk - Adam Nevraumont
我认为最好不要让线程在中间停止,而是让它等待信号量。但你必须在析构函数中重置信号量。 - Cameron
@Cameron 也许你是对的。我会试一试。 - mbrt
显示剩余3条评论
4个回答

2
如果工作线程完成并且在执行working_=false之前另一个排队请求到达怎么办?
然后该值将被推送到队列中,但在标志设置后直到另一个值入队后才会被处理。您(或您的用户)可以决定是否可以接受。这可以通过使用锁来避免,但它们不符合您的要求。
如果正在运行的线程即将完成并设置了working_=false;但在下一个值入队之前尚未停止运行,则代码可能会失败。在这种情况下,您的代码将调用运行线程上的{{link1:operator =}},根据链接文档,这将导致调用std::terminate
在将worker分配给新线程之前添加worker_.join()应该可以防止这种情况的发生。
另一个问题是,如果队列已满(因为它具有固定大小),则queue_.push可能会失败。目前,您只是忽略了这种情况,该值将不会添加到完整的队列中。如果等待队列有空间,则无法快速进行入队操作(在极端情况下)。您可以获取push返回的布尔值(指示是否成功)并从enqueue中返回它。这样,调用者就可以决定是要等待还是放弃该值。

或者使用非固定大小的队列。Boost对此选择的说法如下:

可用于在推送期间完全禁用动态内存分配,以确保无锁行为。 如果将数据结构配置为固定大小,则内部节点存储在数组中,并通过数组索引进行寻址。 这限制了队列的可能大小,最多可以由索引类型(通常为2 ** 16-2)寻址的元素数量, 但在缺乏双宽比较和交换指令的平台上,这是实现无锁自由的最佳方法。


我不会在每次入队时都启动一个线程,而只会在第一次入队时启动。该线程将持续工作直到队列为空,然后停止(这是因为工作操作比入队操作慢得多;也许从我发布的代码中并不清楚)。忙等待的问题在于,在我的特定情况下,工作线程有很长时间不工作。我之所以做出这个选择,是基于这个具体原因。 - mbrt
啊,是的,我读错了你的代码,我的回答下半部分基本上是不正确的。我现在已经把它删除了。 - eerorika
我已经编辑了我的问题,以更清楚地阐明要求。 - mbrt
我已经修正了我的回答,因为我对你的代码的错误解释仍然存在一些错误。现在我也怀疑原子比较在这种情况下并没有帮助。 - eerorika
你说得对,入队操作可能会失败,但在我的情况下,队列的大小是有限的。然而,为了提供断言,最好返回一个布尔值来表示是否失败。 - mbrt

1

你的工作线程需要超过2种状态。

  • 未运行
  • 执行任务
  • 空闲关闭
  • 关闭

如果你强制关闭,它会跳过空闲关闭。如果你没有任务了,它会转换到空闲关闭。在空闲关闭状态下,它会清空任务队列,然后进入关闭状态。

设置关闭状态,然后你走到了你的工作任务的末尾。

生产者首先将东西放在队列上。然后它检查工作线程状态。如果是关闭或空闲关闭,则首先join它(并将其转换为未运行),然后启动一个新的工作线程。如果未运行,则仅启动一个新的工作线程。

如果生产者想要启动一个新的工作线程,它首先确保我们处于未运行状态(否则,逻辑错误)。然后我们转换到执行任务状态,然后启动工作线程。

如果生产者想要关闭辅助任务,它设置完成标志。然后它检查工作线程状态。如果不是未运行,则加入它。

这可能导致一个无缘无故启动的工作线程。

有一些情况下,上述代码可能会被阻塞,但以前也有一些情况发生过。

然后,我们编写一个正式或半正式的证明,证明上述代码不会丢失消息,因为当编写无锁代码时,直到你有了证明才算完成。


我不明白为什么需要“空闲关闭”。难道“关闭”不足够吗? - mbrt
@michele.bertasi 不必在外部显示,但实际工作线程中有两条不同的代码路径。 在进入空闲关闭后,您必须清空队列(或冒失丢失),然后进入关闭状态。 然而,完成操作时,您不希望清空队列,因此直接进入关闭状态。 - Yakk - Adam Nevraumont

0
非常不完整的回答:我认为所有这些原子操作、信号量和状态都是从“线程”到“工作线程”的一种反向通信渠道。为什么不使用另一个队列呢?至少,思考一下这个问题会帮助你解决这个问题。

0
这是我对问题的解决方案。虽然我不太喜欢回答自己的问题,但我认为展示实际的代码可能会帮助其他人。
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"

using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;

class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }

    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};

我尝试了@Cameron的建议,不关闭线程并添加信号量。这实际上仅在第一个enqueue和最后一个work中使用。这不是无锁的,但仅限于这两种情况。

我进行了一些性能比较,比较了我的先前版本(请参见我的编辑问题)和这个版本。当没有太多的启动和停止时,没有显着的差异。然而,当需要signal工作线程而不是启动新线程时,enqueue速度快10倍。这是一个罕见的情况,所以它并不是非常重要,但无论如何都是一种改进。

此实现满足以下条件:

  • 在常见情况下无锁(当enqueuework正在忙碌时);
  • 如果长时间没有enqueue,则不会有繁忙等待
  • 析构函数尽快退出
  • 正确性?? :)

对我来说好像好一点,但肯定不正确:在enqueue期间,working_可能为真(因此信号量不会被设置),但工作线程可能已经退出了第一个内部while循环(并且尚未清除working_)。然后它将永远等待信号量(直到下一个enqueue)。我建议您专门使用信号量--虽然刚刚翻了一下,我没有看到任何漂亮的旋转几个周期(无锁)然后阻塞在操作系统原语上的信号量... - Cameron
如果工作线程尚未清除working_,但另一个enqueue到来,则没有问题,因为在clear之后,会有另一个循环。这确保了这些项被处理。同时,如果出现另一个enqueue,则working_标志现在为0,因此信号量获得了post - mbrt
哦,我错过了。有趣。那为什么不干脆把 working_ 全部去掉呢?(啊,我想我知道为什么了:信号量是重量级的——我真的看不出来为什么信号量不能只是一个原子整数和一个简单的自旋等待,只有在必要时才回退到更重的操作系统原语;我可能很快就会写这样的实现,似乎很多代码都可以从轻量级信号量中受益)。 - Cameron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接