我正在尝试实现一个使用两个线程的类:一个用于生产者,另一个用于消费者。当前的实现不使用锁:
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
Worker() : working_(false), done_(false) {}
~Worker() {
done_ = true; // exit even if the work has not been completed
worker_.join();
}
void enqueue(int value) {
queue_.push(value);
if (!working_) {
working_ = true;
worker_ = std::thread([this]{ work(); });
}
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_ = false;
}
private:
std::atomic<bool> working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
应用程序需要在一定时间内对工作项进行排队,然后休眠等待事件发生。以下是模拟该行为的最小主体:
int main()
{
Worker w;
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
我很确定我的实现有错误:如果工作线程完成并在执行
working_ = false
之前,另一个enqueue
到达怎么办?有可能使我的代码线程安全而不使用锁吗?解决方案需要:
- 快速入队
- 即使队列不为空,析构函数也必须退出
- 没有繁忙等待,因为工作线程处于空闲状态的时间很长
- 如果可能,不要使用锁
我根据你的建议对
Worker
类进行了另一种实现。这是我的第二次尝试:class Worker
{
public:
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false) { }
~Worker() {
// exit even if the work has not been completed
done_ = true;
if (worker_.joinable())
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set()) {
if (worker_.joinable())
worker_.join();
worker_ = std::thread([this]{ work(); });
}
return enqueued;
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
我在
enqueue
方法中引入了worker_.join()
。这可能会影响性能,但非常罕见(当队列为空并且线程退出之前,另一个enqueue
出现)。working_
变量现在是一个atomic_flag
,在enqueue
中设置,在work
中清除。working_.clear()
后的附加while
是必需的,因为如果在clear
之前但在while
之后推入另一个值,则该值不会被处理。这个实现是正确的吗?
我进行了一些测试,实现似乎可以工作。
OT:将其作为编辑或答案更好?
enqueue
是由关键性能代码调用的,所以我试图优化那部分。是的,线程挂起是一个可能的问题。 - mbrtenqueue
能够工作并且即使存在优先级反转和/或工作线程已被暂停和/或工作线程永远不会被调度也能够低成本地推进?性能有多关键-每像素每帧操作在4k x 2k缓冲区的性能至关重要,如果图像不能以60 Hz渲染,则有人会死亡吗?还是说“我不想在用户单击UI元素时使他们感到恼怒”?可能介于两者之间,但是在哪里呢? - Yakk - Adam Nevraumont