公平临界区(Linux)

5

在一个多线程的Linux应用程序中,我使用互斥锁来保护关键部分。这非常有效,但公平性是个问题。可能会发生这样的情况:一个线程离开关键部分并立即重新进入,不给其他线程任何机会。例如:

while(true)
{
    critsect.enter();
    ... do calculations ...
    ... maybe call a blocking operation so we sleep ...
    critsect.leave();
}

可能会很有可能阻止其他线程进入同一关键部分。互斥锁不公平。

有没有办法使关键部分变得公平?我考虑添加一个队列,以便按照它们的“到达”顺序执行关键部分。或者至少计数器,如果其他线程正在等待,则在解锁后进行pthread_yield()。

这种要求有推荐的做法吗?


你能否发布一个权威的资源来支持你的公平性主张。在我看来,我认为“可能发生一个线程离开关键部分并立即重新进入不会给其他任何线程机会”的说法是不正确的。 - sehe
当互斥锁被解锁时,线程会继续运行。特别是在解锁之前有线程切换(例如阻塞操作)的情况下,极有可能线程会不间断地执行到下一个锁。 现在,pthread_mutex是一个非排队互斥锁,因此即使某些其他当前不活动的线程正在阻塞锁调用,我也可以锁定它。我没有明确的资源记录这一点,但stackoverflow问题3045066显示了一个示例,您可以在单核系统上运行它。 - Frank
在关键部分内阻塞或休眠是不明智的,因为其他线程无法利用此睡眠时间来运行。 - fa.
懒人专用:http://stackoverflow.com/questions/3045066/permanent-mutex-locking-causing-deadlock - slaphappy
5个回答

9
您可以通过pthread互斥锁来构建一个先进先出的“票据锁”,操作步骤如下:
#include <pthread.h>

typedef struct ticket_lock {
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    unsigned long queue_head, queue_tail;
} ticket_lock_t;

#define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER }

void ticket_lock(ticket_lock_t *ticket)
{
    unsigned long queue_me;

    pthread_mutex_lock(&ticket->mutex);
    queue_me = ticket->queue_tail++;
    while (queue_me != ticket->queue_head)
    {
        pthread_cond_wait(&ticket->cond, &ticket->mutex);
    }
    pthread_mutex_unlock(&ticket->mutex);
}

void ticket_unlock(ticket_lock_t *ticket)
{
    pthread_mutex_lock(&ticket->mutex);
    ticket->queue_head++;
    pthread_cond_broadcast(&ticket->cond);
    pthread_mutex_unlock(&ticket->mutex);
}

在这种方案下,当一个线程处于ticketlock保护的关键部分时,不会持有低级别的pthread互斥锁,从而允许其他线程加入队列。


TICKET_LOCK_INITIALIZER 是否应该更新为 {PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER, 0, 0 }?看起来这样会更完整一些... - HighExodus
1
@HighExodus:虽然可以这样做,但是这是不必要的——在C语言中,对象永远不会被部分初始化,任何没有显式初始化器的聚合成员都会被初始化为相应类型的零。 - caf
有一个额外的好处是可以将票号写入调试日志中,这可以帮助某人调查例如死锁等问题。 - Arne Vogel
在互斥锁之前,应该原子化地初始化 'queue_me'。如果在此之前没有初始化,就无法保证先进先出的顺序,因为第一个线程将获得锁,其他线程将等待它。在第一个线程释放锁之后,下一个被唤醒的线程可能是任何一个等待获取锁的线程。 - Carlos Barcellos

4
即使有一个公平的临界区,代码的性能可能仍然很差,因为如果临界区被长时间持有,线程将经常等待它。因此,建议您尝试重构代码,使其不需要在长时间内锁定临界区。可以通过完全使用不同的方法(通过消息队列传递对象通常是推荐的,因为易于正确实现),或者至少通过在本地变量上执行大部分计算而不持有锁定,然后只在存储结果时才获取锁定来实现。如果锁定时间较短,则线程等待时间将减少,这通常会提高性能并使公平性成为非问题。您还可以尝试增加锁定粒度(单独锁定较小的对象),这也将减少争用。
编辑:好的,考虑了一下,我相信Linux中的每个关键段都是近似公平的。每当有睡眠者时,解锁操作必须进入内核以告诉它唤醒它们。从内核返回时,调度程序运行并选择具有最高优先级的进程。等待时睡眠者的优先级会升高,因此在某个时刻,它们将足够高,以致释放将导致任务切换。

是的,这当然是一个正确的陈述。然而,它并没有真正解决问题,因为即使关键部分很小,如果我在循环中调用它们(甚至不必是紧密的循环,因为线程调度不是微秒级的),同样的问题也可能会出现。在我的特定情况下,由于受保护的代码的性质,没有太多的重构可能。无论如何,问题是如何获得公平的关键部分。 - Frank
1
让我怀疑你需要将“进行一些计算”放在关键部分内。只需在该部分中复制所需的少量数据,然后在外部(但在循环中)进行计算。 - fa.
如果您锁定了一个循环,但仅在其中的一小部分时间内持有锁,则其他线程需要等待的概率会降低。但是,如果您无法重构代码,则需要使用公平的临界区。思考一下,我实际上相信这是必要的。 - Jan Hudec

0
如果你的说法是正确的(我没有时间去查看,而且看起来你在发布问题之前已经进行了研究),我建议
 sleep(0);

在关键部分之间显式地产生。
while(true)
{
    critsect.enter();
    ... do calculations ...
    ... maybe call a blocking operation so we sleep ...
    critsect.leave();
    sleep(0);
}

当然啦 :-) 但我不想这样做,因为这种情况只会在大约20%的情况下发生。所以每次让出CPU都会使线程失去它们分配的CPU时间,从而降低性能。 - Frank

0

好的,这样怎么样:

while(true)
{
    sema.wait;
    critsect.enter();
    sema.post;
    ... do calculations ...
    ... maybe call a blocking operation so we sleep ...
    critsect.leave();
}

将信号量计数初始化为1。在尝试获取CS之前,让其他线程也等待信号量,并在完成后发出信号。如果“calculate”线程获得了信号量,它可以进入CS并锁定它。在锁定内部,但在长时间计算之前,信号量被发出,然后另一个线程可以到达CS但无法进入其中。当“calculate”线程退出锁定时,它不能循环并重新锁定它,因为信号量计数为零,所以其他线程获取锁定。'calculate'线程必须等待信号量,直到已经进入的其他线程完成其访问并发出信号。

通过这种方式,即使另一个线程实际上无法访问数据,它也可以“保留”对数据的访问。

此致, 马丁


好的,所以我的代码格式出了问题 :( 在我被人因为愚蠢而点踩之前,我想在这里说一下。 <g> - Martin James
while(true) { critsect.enter(); ... do calculations ... ... maybe call a blocking operation so we sleep ... critsect.leave(); sleep(0); } - Martin James
有些日子,最好根本不要起床 :(( 我尝试在之前的评论中添加反引号代码,但它看起来还是不对 :( - Martin James

0

在我看来,你可以在Linux上使用FIFO调度程序并更改线程的优先级:

thread_func() {
    ... 
    pthread_t t_id = pthread_self();
    struct sched_param prio_zero, prio_one;
    prio_zero.sched_priority = sched_get_priority_min(SCHED_FIFO);
    prio_one.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
    phtread_setschedparam(t_id, SCHED_FIFO, &prio_zero);
    ...
    while(true)
    {
        ... Doing something before
        phtread_setschedparam(t_id, SCHED_FIFO, &prio_one);
        critsect.enter();
        ... do calculations ...
        ... maybe call a blocking operation so we sleep ...
        critsect.leave();
        phtread_setschedparam(t_id, SCHED_FIFO, &prio_zero);
        ... Do something after
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接