在无锁实现中,没有互斥锁的条件变量

6
我有一个基于std::atomics实现的无锁单生产者多消费者队列,类似于Herb Sutter在CPPCon2014上的演讲。有时,生产者速度过慢以至于不能满足所有消费者的需求,因此消费者可能会饥饿。我想防止饥饿的消费者反复查询队列,因此我增加了一个10ms的睡眠时间。这个值是任意的且不够优化。我希望使用一种信号方式,让消费者在队列中有空闲位置时可以通知生产者。在基于锁的实现中,我自然会使用std::condition_variable来完成这个任务。但现在在我的无锁实现中,我不确定是否应该引入一个mutex,只是为了能够使用std::condition_variable。 我只是想问你,在这种情况下,是否应该使用mutex?
编辑:我只有一个从不休眠的生产者,同时存在多个可能会饥饿而休眠的消费者。因此整个系统总是在向前进行,因此我认为它是无锁的。我的当前解决方案是将以下代码放入消费者的GetData函数中:
std::unique_lock<std::mutex> lk(_idleMutex);_readSetAvailableCV.wait(lk); 生产者线程在准备好新数据时使用以下代码:
_readSetAvailableCV.notify_all();

1
一个带有信号的睡眠线程基本上从定义上来说不是无锁的。你明白无锁主要关注于线程调度和进展方面的某些保证,而不是“更快”的问题,对吧?你需要哪些无锁保证呢? - Yakk - Adam Nevraumont
我想我不太明白。如果是消费者挨饿,为什么会在队列中有一个空位时通知生产者呢? - Davislor
但是,如果您需要一个具有多个编写器的变量,这意味着至少有一个编写器已准备好消耗更新,那么这听起来像是原子标志。 - Davislor
如果不了解为什么要使用带有原子操作的无锁队列,那么很难给出关于正确设计选择的有用答案。这听起来就像你做出了错误的选择,一开始应该使用锁。在你的使用情况下,无锁设计的优势是什么,可以证明额外的争用是合理的吗? - David Schwartz
2
@BenjaminMenkuec,你应该使用互斥锁,因为互斥锁通过取消竞争线程的调度来最小化竞争。原子操作和无锁算法并不能避免竞争带来的惩罚。 - David Schwartz
显示剩余3条评论
4个回答

4
如果大多数线程只是在等待生产者将资源入队,我不确定无锁实现是否值得努力。大部分时间,你的线程会休眠,它们不会为了争夺队列锁而相互抵消。
因此,我认为(根据您提供的数据量),将所有内容更改为使用mutex + conditional_variable就可以了。当生产者将资源入队时,它仅通知一个线程(使用notify_one()),并释放队列锁。锁定队列并出队资源的消费者如果队列再次为空,则返回睡眠状态。线程之间不应该有任何真正的“摩擦”(如果你的生产者很慢),所以我会选择这种方法。

我同意你的观点。在使用互斥锁实现之前,线程之间没有太多的摩擦。但是,为了练习,我想将其转换为无锁设计。我测量了速度,与以前大致相同。 PS:根据系统和硬盘速度的不同,有时消费者更快,有时生产者更快。 - bodzcount
3
让代码变得更糟糕似乎不是一个有用的实践。 - David Schwartz

2
我刚刚观看了这个关于并发 TS 的 CPPCON 视频:Artur Laksberg @cppcon2015
在讲座的中间部分,Artur 解释了如何使用障碍和闩锁来解决我的问题。他还展示了一种使用条件变量的现有解决方法,就像我所做的那样。他强调了用于此目的的条件变量的一些弱点,例如虚假唤醒和在进入等待之前缺少通知信号。然而,在我的应用程序中,这些限制不是问题,所以我认为现在我将使用我在帖子编辑中提到的解决方案-直到障碍/闩锁可用。感谢大家的评论。

1
通过最小化设计更改,您可以简单地使用信号量。信号量开始为空,并且每次生产者推送到队列时增加。消费者在从队列中弹出之前首先尝试将信号量降低。
C++11不提供信号量实现,但可以使用互斥锁、条件变量和计数器来模拟信号量。 如果您确实希望在生产者比消费者更快时获得无锁行为,则可以使用双重检查锁定。
/* producer */
bool was_empty = q.empty_lock_free();
q.push_lock_free(x);
if (was_empty) {
    scoped_lock l(q.lock());
    if (!q.empty()) {
        q.cond().signal();
    }
}

/* consumers */
for (;;) {
    if (q.empty_lock_free()) {
        scoped_lock l(q.lock());
        while (q.empty()) {
            q.cond().wait();
        }
        x = q.pop();
        if (!q.empty()) {
            q.cond().signal();
        }
    } else {
        try {
            x = q.pop_lock_free();
        } catch (empty_exception) {
            continue;
        }
        break;
    }
}

0

pthreads 的一个可能性是,饥饿的线程使用 pause() 睡眠并使用 SIGCONT 唤醒。每个线程都有自己的 awake 标志。如果生产者发布新输入时有任何线程处于睡眠状态,请使用 pthread_kill() 唤醒其中一个。


1
C++11的等价方法是condition_variable::notify_one() - Davislor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接