使用std::atomic和std::condition_variable同步不可靠

13

我在一个用C++11编写的分布式作业系统中实现了一个栅栏(即在工作线程池之外的线程可能会请求阻塞,直到当前所有已安排的作业都完成),使用以下结构:

struct fence
{
    std::atomic<size_t>                     counter;
    std::mutex                              resume_mutex;
    std::condition_variable                 resume;

    fence(size_t num_threads)
        : counter(num_threads)
    {}
};

实现围栏功能的代码如下:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        f->resume.wait(lock);   // (3)
    }
}

如果线程在一段时间内进入栅栏,那么这个方法会非常有效。然而,如果它们几乎同时尝试这样做,有时候会出现在原子递减(1)和开始等待条件变量(3)之间,线程会让出CPU时间并且另一个线程将计数器递减到零(1),并触发条件变量(2)。这将导致前一个线程在(3)中永远等待,因为它在收到通知后开始等待。

使它可行的方法是在(2)之前加上10毫秒的延迟,但由于明显的原因,这是不可接受的。

您有什么关于如何以高效的方式修复这个问题的建议吗?

2个回答

14

你的诊断是正确的,这段代码容易出现你所描述的遗漏条件通知的情况。也就是说,在一个线程锁定互斥量后但在等待条件变量之前,另一个线程可能会调用notify_all(),导致第一个线程错过了该通知。

一个简单的修复方法是在递减计数器并在通知时锁定互斥量:

void task_pool::fence_impl(void *arg)
{
    auto f = static_cast<fence*>(arg);
    std::unique_lock<std::mutex> lock(f->resume_mutex);
    if (--f->counter == 0) {
        f->resume.notify_all();
    }
    else do {
        f->resume.wait(lock);
    } while(f->counter);
}
在这种情况下,计数器不需要是原子的。
使用互斥锁在发出信号之前进行加锁的额外奖励(或惩罚,取决于观点)是(来自此处):
“pthread_cond_broadcast()”或“pthread_cond_signal()”函数可以由线程调用,无论它当前是否拥有线程调用“pthread_cond_wait()”或“pthread_cond_timedwait()”期间与条件变量关联的互斥锁。但是,如果需要可预测的调度行为,则调用“pthread_cond_broadcast()”或“pthread_cond_signal()”的线程应锁定该互斥锁。
关于“while”循环(来自此处):
“pthread_cond_timedwait()”或“pthread_cond_wait()”函数可能会发生虚假唤醒。由于从“pthread_cond_timedwait()”或“pthread_cond_wait()”返回并不意味着对该谓词的值做了任何事情,因此在此类返回时应重新评估该谓词。

“unique_lock”缺少其所需的模板参数:“unique_lock<mutex>”,这在问题和答案中都是如此。但除此之外,我同意,+1。 - Howard Hinnant
哦,类内有一个 typedeffence 是一个嵌套结构体),但我已经添加了模板参数以提高可读性,谢谢。 - IneQuation
@Maxim Yegorushkin,为什么你在最后一个代码块中添加了一个 while(f->counter) 语句? - IneQuation
1
@IneQuation,我为你添加了有关while循环的注释。 - Maxim Egorushkin
好的,那么这就涉及到*nix上的pthread实现。在Windows上也需要吗?我知道这不会有害,现在我想问的是这是否必要。 - IneQuation
1
@IneQuation 在可移植的C++代码中需要使用,参见http://en.cppreference.com/w/cpp/thread/condition_variable/wait 不确定condition_variable在Windows上是如何实现的,您可以在Boost条件变量源代码中查找。 - Maxim Egorushkin

-1
为了保持原子操作的高性能而非使用完整的互斥锁,您应该将等待条件更改为锁定、检查和循环。
所有条件等待都应该以这种方式进行。条件变量甚至有第二个参数可以等待一个谓词函数或 lambda 表达式。
代码可能会像这样:
void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        while(f->counter) {
            f->resume.wait(lock);   // (3)
        }
    }
}

3
据我所知,你只是添加了一个循环来捕获虚假唤醒。这并没有解决我最初描述的问题 - 一个线程在执行(3)时仍然可能让出CPU,使得另一个线程在原始线程最终到达(3)之前到达(2)。 - IneQuation
除此之外,您所说的“锁定、检查和循环”是指自旋锁吗? - IneQuation

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接