pthread_cond_wait和pthread_mutex_lock的优先级?

3
我是一名有用的助手,可以为您翻译文本。
我有多个读线程和一个写线程。如果我在其中一个读线程上锁定互斥锁并从中发送广播,那么是否保证互斥锁将被等待pthread_cond_wait()的写线程锁定,或者有可能另一个正在等待pthread_mutex_lock()的读线程会锁定互斥锁?主要问题是pthread_cond_wait()是否优先于pthread_mutex_lock()?
如果不是,如何确保在pthread_cond_broadcast()时互斥锁始终由写线程锁定?
示例
读线程:
pthread_mutex_lock(mutex);
pthread_cond_broadcast(cond);
pthread_mutex_unlock(mutex);

编写线程:

pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
1个回答

1
假设读和写两个线程同时到达pthread_mutex_lock。因此,要么写线程在pthread_mutex_lock调用时获取互斥锁,要么读线程获取互斥锁。
如果是写线程,则读线程将在pthread_mutex_lock上等待。通过调用pthread_cond_wait,写线程释放mutex并阻塞在cond上。这是原子完成的。因此,当读线程被授予mutex时,我们可以确定读线程在等待cond。因此,在cond上进行广播会到达写线程,它不再等待cond,但仍在pthread_cond_wait范围内尝试获取mutex(由读线程持有)。在广播cond后,读线程释放mutex,然后转到写线程。因此,写线程最终从pthread_cond_wait中退出,同时保持mutex已锁定。记得稍后解锁它。
如果是读线程,写线程将在pthread_mutex_lock上等待,读线程会在cond上广播信号,然后释放mutex。然后写线程会在pthread_mutex_lock上获取mutex,并立即在pthread_cond_wait上释放它,等待cond(请注意,先前的cond广播对当前的pthread_cond_wait没有影响)。在下一个读线程迭代中,它会在mutex上获取锁,向cond发送广播,然后解锁mutex。这意味着写线程在cond上前进并在mutex上获取锁。
这回答了你关于优先级的问题吗?

评论后更新。

假设我们有一个线程(让我们将其命名为 A 以便以后参考)持有 mutex 上的锁,还有一些其他线程试图获取相同的锁。一旦第一个线程释放了锁,就无法预测哪个线程会获得锁。此外,如果 A 线程有一个循环并尝试重新获取 mutex 上的锁,则有可能授予此锁,并且其他线程将继续等待。添加 pthread_cond_wait 在授予锁的范围内不会改变任何内容。

让我引用 POSIX 规范的片段(请参见 https://stackoverflow.com/a/9625267/2989411 作为参考):

这些函数会原子性地释放互斥锁并使调用线程在条件变量cond上阻塞;此处的“原子性”意味着“相对于另一个线程访问互斥锁和条件变量而言是原子的”。也就是说,如果另一个线程能够在即将阻塞的线程释放互斥锁后获得它,则该线程中随后对pthread_cond_broadcast()或pthread_cond_signal()的调用将表现为在即将阻塞的线程被阻塞之后发出的调用。
这是标准关于操作顺序的唯一保证。授予锁给其他线程的顺序相当不可预测,并且它会根据一些非常微妙的时间波动而改变。
请针对仅涉及互斥锁的代码,请尝试使用以下代码:
#define _GNU_SOURCE
#include <pthread.h>

#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *th(void *arg) {
    int i;
    char *s = arg;
    for (i = 0; i < 10; ++i) {
        pthread_mutex_lock(&mutex);
        printf("%s %d\n", s, i);
        //sleep(1);
        pthread_mutex_unlock(&mutex);
#if 0
        pthread_yield();
#endif
    }
    return NULL;
}

int main() {
    int i;
    for (i = 0; i < 10; ++i) {
        pthread_t t1, t2, t3;
        printf("================================\n");
        pthread_create(&t1, NULL, th, "t1");
        pthread_create(&t2, NULL, th, "     t2");
        pthread_create(&t3, NULL, th, "            t3");
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
    }
    return 0;
}

在一台机器上(单CPU),它总是从t3开始显示整个循环,然后是t2,最后是t1。在另一台机器上(2个核心),线程的顺序更随机,但几乎总是在将互斥锁授予其他线程之前为每个线程显示整个循环。很少会出现这样的情况:
t1 8
t1 9
            t3 0
     t2 0
     t2 1
     [removed other t2 output]
     t2 8
     t2 9
            t3 1
            t3 2

通过将#if 0替换为#if 1来启用pthread_yield,观察结果并检查输出。对我来说,它可以使两个线程交替显示其输出,然后第三个线程最终有机会工作。添加另一个或更多的线程。玩一下sleep等。它证实了随机行为。
如果您想进行一些实验,请编译并运行以下代码片段。这是单生产者-多消费者模型的示例。它可以使用两个参数运行:第一个是消费者线程的数量,第二个是要处理的数据系列的长度。如果没有给出参数,则有一个消费者线程和120个项目要处理。我还建议在标记为/* play here */的地方使用sleep/usleep:更改参数值,完全删除sleep,在关键部分移动它(如果适当),或者用pthread_yield替换并观察行为的变化。
#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

struct data_t {
    int seq;
    int payload;
    struct data_t *next;
};

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;

void push(int seq, int payload) {
    struct data_t *e;
    e = malloc(sizeof(struct data_t));
    e->seq = seq;
    e->payload = payload;
    e->next = NULL;
    if (last == NULL) {
        assert(first == NULL);
        first = last = e;
    } else {
        last->next = e;
        last = e;
    }
}

struct data_t pop() {
    struct data_t res = {0};
    if (first == NULL) {
        res.seq = -1;
    } else {
        res.seq = first->seq;
        res.payload = first->payload;
        first = first->next;
        if (first == NULL) {
            last = NULL;
        }
    }
    return res;
}

void *producer(void *arg __attribute__((unused))) {
    int i;
    printf("producer created\n");
    for (i = 0; i < num_data; ++i) {
        int val;
        sleep(1); /* play here */
        pthread_mutex_lock(&mutex);
        val = rand() / (INT_MAX / 1000);
        push(i, val);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond);
        printf("prod %3d %3d signaled\n", i, val);
    }
    in_progress = 0;
    printf("prod end\n");
    pthread_cond_broadcast(&cond);
    printf("prod end signaled\n");
    return NULL;
}

void *consumer(void *arg) {
    char c_id[1024];
    int t_id = *(int *)arg;
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
    printf("%s created\n", c_id);
    while (1) {
        struct data_t item;
        pthread_mutex_lock(&mutex);
        item = pop();
        while (item.seq == -1 && in_progress) {
            printf("%s waits for data\n", c_id);
            pthread_cond_wait(&cond, &mutex);
            printf("%s got signal\n", c_id);
            item = pop();
        }
        if (!in_progress && item.seq == -1) {
            printf("%s detected end of data.\n", c_id);
            pthread_mutex_unlock(&mutex);
            break;
        }
        pthread_mutex_unlock(&mutex);
        printf("%s processing %3d %3d\n", c_id, item.seq, item.payload);
        sleep(item.payload % 10); /* play here */
        printf("%s processed  %3d %3d\n", c_id, item.seq, item.payload);
    }
    printf("%s end\n", c_id);
    return NULL;
}

int main(int argc, char *argv[]) {
    int num_cons = 1;
    pthread_t t_prod;
    pthread_t *t_cons;
    int i;
    int *nums;
    if (argc > 1) {
        num_cons = atoi(argv[1]);
        if (num_cons == 0) {
            num_cons = 1;
        }
        if (num_cons > 99) {
            num_cons = 99;
        }
    }
    if (argc > 2) {
        num_data = atoi(argv[2]);
        if (num_data < 10) {
            num_data = 10;
        }
        if (num_data > 600) {
            num_data = 600;
        }
    }

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data);
    t_cons = malloc(sizeof(pthread_t) * num_cons);
    nums = malloc(sizeof(int) * num_cons);
    if (!t_cons || !nums) {
        printf("Out of memory!\n");
        exit(1);
    }
    srand(time(NULL));
    pthread_create(&t_prod, NULL, producer, NULL);

    for (i = 0; i < num_cons; ++i) {
        nums[i] = i + 1;
        usleep(100000); /* play here */
        pthread_create(t_cons + i, NULL, consumer, nums + i);
    }

    pthread_join(t_prod, NULL);

    for (i = 0; i < num_cons; ++i) {
        pthread_join(t_cons[i], NULL);
    }
    free(nums);
    free(t_cons);

    return 0;
}

我希望我已经解除了你的疑惑,并为你提供了一些代码来实验和增强对pthread行为的信心。


谢谢回答。不过我不确定它完全回答了我所提出的问题。如果一个写线程正在等待pthread_cond_wait(),而我们有两个读线程,那会发生什么呢?一个读线程设法锁定互斥量并发送广播。在它释放这个互斥量之后,有没有任何保证写线程会接下来锁定互斥量,因为它正在等待广播(同时另一个读线程也在等待锁定互斥量)? - testtest1235
@testtest1235 我已经补充了一些额外的信息和示例来扩展我的答案。 - ArturFH

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接