C++11线程安全队列

92

我正在从事的项目使用多线程对一组文件进行处理。每个线程可以将文件添加到待处理的文件列表中,因此我编写了一个(我认为是)线程安全的队列。以下是相关部分:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

然而,我偶尔会在if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } 代码块中出现段错误,通过使用gdb进行检查发现段错误是因为空队列导致的。这怎么可能?我的理解是wait_for仅在被通知后才返回cv_status :: no_timeout,并且这应该只会在FileQueue :: enqueue刚刚将新项目推入队列之后发生。


4
请看这里:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html - GManNickG
1
问题是,为什么你要通过引用传递方式来获取 filename?我在这里看不出任何理由。 - Tony The Lion
1
@TonyTheLion 通常在C++中,通过引用传递对象比复制更有效。在这种情况下,我还使用了移动语义,它允许编译器将字符串的内容移动到队列中,而不是再次复制。 - Matt Kline
2
实际上,在这里利用移动语义的首选方法是使用std::move,并通过值而不是非const右值引用获取enqueuefilename参数。目前,它只能被调用为右值,这可能不是您想要的。 - Ben Hymers
进一步强调@BenHymers所说的,这个调用者必须使用std::move;在这里使用它是没有意义的,因为您已经有了一个右值引用,而所有std::move所做的就是将其转换为一个。在这里声明一个按值传递的参数,然后使用std::move将其推入队列是理想的,因为调用者可以通过值传递(在调用之前进行复制,然后通过std::move将该副本移动到队列中)或通过右值引用(调用者使用std::move,在这种情况下,移动构造函数被调用来构造参数,然后再次std::move推送到队列中。 - WhozCraig
显示剩余4条评论
9个回答

88
最好将条件(通过您的条件变量监视)设为while循环的反向条件:while(!some_condition)。在此循环内,如果条件不满足,则进入睡眠状态,触发循环体。
这样,如果您的线程被唤醒-可能是因为虚假唤醒-则在继续之前,您的循环仍会检查条件。将“条件”视为感兴趣的状态,“条件变量”更像是系统发出的信号,表明该状态“可能”准备就绪。循环将进行实际确认并在条件不满足时进入睡眠状态。
我刚编写了一个异步队列模板,希望这可以帮助您。在此,q.empty()是我们想要的相反条件:队列中有东西。因此它作为while循环的检查。
#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif

3
谢谢!幸运的是,我已经使用谓词(在此处描述)解决了这个问题。 - Matt Kline
2
在我看来,这是最简单和最优雅的解决方案。 - kuroi neko
19
“FYI: The while(q.empty()) loop is equivalent to: c.wait( lock, [&]{ return !q.empty(); } );” 的意思是,“顺便提一下,while(q.empty()) 循环等同于 c.wait( lock, [&]{ return !q.empty(); } );”。 - Ahmed Nassar
1
当调用dequeue时,如果队列为空,它会不必要地获取锁并保持它。这将导致入队者被阻塞。你遇到了死锁。 - Ajay
4
@Ajay,为了那些将来阅读你评论的人(像我一样)的方便,请注意,这并不是情况,因为condition_variable::wait会释放锁,并且只有在被唤醒后才重新获取锁定 - GPhilo
显示剩余6条评论

37
根据标准,即使事件尚未发生,也允许“条件变量”发出虚假唤醒。在虚假唤醒的情况下,它将返回cv_status::no_timeout(因为它是被唤醒而不是超时),即使它没有被通知。正确的解决方法当然是在继续之前检查唤醒是否真实合法。
细节在标准§30.5.1 [thread.condition.condvar]中指定:
—该函数将在以下情况下取消阻止:通过调用notify_one() ,通过调用notify_all() ,根据abs_time指定的绝对超时(30.2.4)到期,或者虚假唤醒。
返回:如果abs_time指定的绝对超时(30.2.4)已过,则返回cv_status :: timeout,否则返回cv_status :: no_timeout。

1
那你认为应该怎么做呢?只是再次检查队列是否为空吗? - Matt Kline
4
附录:我可以使用像这里的谓词来保护免受虚假唤醒。 - Matt Kline
2
是的。它被称为“条件变量”,因为它与某些条件相关联,您必须检查该条件是否实际为真。在您的情况下,要检查的条件是!q.empty() - Jonathan Wakely
5
如果您将lambda函数用作wait()调用的可选参数,它将为您执行检查,并防止虚假唤醒产生任何影响。 - derpface
7
历史上,这样做最有可能是为了支持低级别Unix系统调用的行为。如果从客户端设计的角度来看,您可能会觉得有些不尽如人意,1989年的Richard Gabriel也有同感。他对此的思考成为了一篇非常著名的软件设计文章,“更差就是更好”的崛起 - T.E.D.

20

这可能是你应该做的方法:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}

谢谢!我最终做了类似的事情。 - Matt Kline
对于那段本来很好的代码,有两点需要注意: 1)在调用notify_one之前,我会解锁互斥量,原因可以在http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one中找到。 2)等待过程可能会出现虚假唤醒,因此我还会引入一个bool变量,指示push确实已经完成。 - IceFire
1
  1. 微小的优化。
  2. wait_for 的这个重载通过第二个参数处理虚假唤醒。
- ronag
抱歉,你是对的。实际上,1)已经通过lock_guard完成了。我的2)推理错误在于我认为如果队列不为空,push操作可以继续进行...然而,在这种情况下,仍被阻塞的互斥锁将始终使wait_for等待。谢谢! - IceFire
我有一个关于这个的问题。为什么在push和pop操作中使用的锁包装器不同?为什么不在两者上都使用std::unique_lock呢? - user7024
我对此有一个问题。为什么在push和pop操作中使用的锁包装器不同?为什么不都使用std::unique_lock呢? - user7024

13

除了已经接受的答案,我认为实现一个正确的多生产者/多消费者队列是困难的(尽管自从C++11以来变得更容易了)。

我建议你尝试使用(非常好的)无锁boost库,"queue"结构将实现您想要的功能,并提供无等待/无锁担保以及无需C++11编译器

我现在添加这个答案是因为无锁库对于boost来说是相当新的(我认为自1.53以来)。


2
感谢指出这个库。目前似乎还没有队列的文档,您有什么建议在哪里找到吗? - Matt Kline

5
我会将您的出队函数改写为以下形式:

我会重新编写您的出队函数:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

它更短,并且不像你的代码那样有重复。唯一的问题是它可能会等待比超时时间更长。为了防止这种情况,您需要在循环之前记住开始时间,检查超时并相应地调整等待时间。或者在等待条件中指定绝对时间。


1

1

BlockingCollection 是一个C++11线程安全的集合类,提供对队列、栈和优先级容器的支持。它处理了你所描述的“空”队列场景,以及“满”队列场景。


1
这是我在C++20中实现的线程队列:
#pragma once
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <concepts>
#include <list>

template<typename QueueType>
concept thread_queue_concept =
    std::same_as<QueueType, std::deque<typename QueueType::value_type, typename QueueType::allocator_type>>
    || std::same_as<QueueType, std::list<typename QueueType::value_type, typename QueueType::allocator_type>>;

template<typename QueueType>
    requires thread_queue_concept<QueueType>
struct thread_queue
{
    using value_type = typename QueueType::value_type;
    thread_queue();
    explicit thread_queue( typename QueueType::allocator_type const &alloc );
    thread_queue( thread_queue &&other );
    thread_queue &operator =( thread_queue const &other );
    thread_queue &operator =( thread_queue &&other );
    bool empty() const;
    std::size_t size() const;
    void shrink_to_fit();
    void clear();
    template<typename ... Args>
        requires std::is_constructible_v<typename QueueType::value_type, Args ...>
    void enque( Args &&... args );
    template<typename Producer>
        requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
    void enqueue_multiple( Producer producer );
    template<typename Consumer>
        requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
    void dequeue_multiple( Consumer consumer );
    typename QueueType::value_type dequeue();
    void swap( thread_queue &other );
private:
    mutable std::mutex m_mtx;
    mutable std::condition_variable m_cv;
    QueueType m_queue;
};

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue()
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( typename QueueType::allocator_type const &alloc ) :
    m_queue( alloc )
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( thread_queue &&other )
{
    using namespace std;
    lock_guard lock( other.m_mtx );
    m_queue = move( other.m_queue );
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue const &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = other.m_queue;
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue &&other )
{
    using namespace std;
    lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = move( other.m_queue );
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
bool thread_queue<QueueType>::thread_queue::empty() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.empty();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
std::size_t thread_queue<QueueType>::thread_queue::size() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.size();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::shrink_to_fit()
{
    std::lock_guard lock( m_mtx );
    return m_queue.shrink_to_fit();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::clear()
{
    std::lock_guard lock( m_mtx );
    m_queue.clear();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename ... Args>
    requires std::is_constructible_v<typename QueueType::value_type, Args ...>
void thread_queue<QueueType>::thread_queue::enque( Args &&... args )
{
    using namespace std;
    unique_lock lock( m_mtx );
    m_queue.emplace_front( forward<Args>( args ) ... );
    m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
typename QueueType::value_type thread_queue<QueueType>::thread_queue::dequeue()
{
    using namespace std;
    unique_lock lock( m_mtx );
    while( m_queue.empty() )
        m_cv.wait( lock );
    value_type value = move( m_queue.back() );
    m_queue.pop_back();
    return value;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Producer>
    requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
void thread_queue<QueueType>::enqueue_multiple( Producer producer )
{
    using namespace std;
    lock_guard lock( m_mtx );
    for( std::pair<bool, value_type> ret; (ret = move( producer() )).first; )
        m_queue.emplace_front( move( ret.second ) ),
        m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Consumer>
    requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
void thread_queue<QueueType>::dequeue_multiple( Consumer consumer )
{
    using namespace std;
    unique_lock lock( m_mtx );
    for( ; ; )
    {
        while( m_queue.empty() )
            m_cv.wait( lock );
        try
        {
            bool cont = consumer( move( m_queue.back() ) );
            m_queue.pop_back();
            if( !cont )
                return;
        }
        catch( ... )
        {
            m_queue.pop_back();
            throw;
        }
    }
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::swap( thread_queue &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue.swap( other.m_queue );
}

唯一的模板参数是BaseType,它可以是std::deque类型或std::list类型,并受thread_queue_concept限制。这个类将此类型用作内部队列类型。选择一个最适合你的应用程序的BaseType。我可能已经在更具区分性的thread_queue_concepts上限制了该类,以检查BaseType的所有使用部分,以便该类可能适用于与std::list<>或std::deque<>兼容的其他类型,但我懒得为这种不太可能出现的情况实现它。
这段代码的优点之一是enqueue_multiple和dequeue_multiple。这些函数被赋予了一个函数对象,通常是一个lambda,它可以在只有一个锁定步骤的情况下排队或出队多个项目。对于enqueue,这总是成立的,对于dequeue,这取决于队列是否有要提取的元素。
如果你有一个生产者和多个消费者,那么enqueue_multiple通常是有意义的。它会导致较长时间的锁定期,因此仅在项能够快速生产或移动时才有意义。
如果你有多个生产者和一个消费者,那么dequeue_multiple通常是有意义的。在这里,我们也有较长的锁定期,但由于对象通常只是快速移动,所以通常不会有影响。
如果dequeue_multiple的消费者函数对象在消费时抛出异常,那么异常会被捕获,并且提供给消费者的元素(作为底层队列类型对象内的右值引用)将被删除。
如果你想使用C++11和这个类,你必须删除概念或使用#if defined(__cpp_concepts)禁用它们。

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接