假设读和写两个线程同时到达
pthread_mutex_lock
。因此,要么写线程在
pthread_mutex_lock
调用时获取互斥锁,要么读线程获取互斥锁。
如果是写线程,则读线程将在
pthread_mutex_lock
上等待。通过调用
pthread_cond_wait
,写线程释放
mutex
并阻塞在
cond
上。这是原子完成的。因此,当读线程被授予
mutex
时,我们可以确定读线程在等待
cond
。因此,在
cond
上进行广播会到达写线程,它不再等待
cond
,但仍在
pthread_cond_wait
范围内尝试获取
mutex
(由读线程持有)。在广播
cond
后,读线程释放
mutex
,然后转到写线程。因此,写线程最终从
pthread_cond_wait
中退出,同时保持
mutex
已锁定。记得稍后解锁它。
如果是读线程,写线程将在
pthread_mutex_lock
上等待,读线程会在
cond
上广播信号,然后释放
mutex
。然后写线程会在
pthread_mutex_lock
上获取
mutex
,并立即在
pthread_cond_wait
上释放它,等待
cond
(请注意,先前的
cond
广播对当前的
pthread_cond_wait
没有影响)。在下一个读线程迭代中,它会在
mutex
上获取锁,向
cond
发送广播,然后解锁
mutex
。这意味着写线程在
cond
上前进并在
mutex
上获取锁。
这回答了你关于优先级的问题吗?
评论后更新。
假设我们有一个线程(让我们将其命名为 A
以便以后参考)持有 mutex
上的锁,还有一些其他线程试图获取相同的锁。一旦第一个线程释放了锁,就无法预测哪个线程会获得锁。此外,如果 A
线程有一个循环并尝试重新获取 mutex
上的锁,则有可能授予此锁,并且其他线程将继续等待。添加 pthread_cond_wait
在授予锁的范围内不会改变任何内容。
让我引用 POSIX 规范的片段(请参见 https://stackoverflow.com/a/9625267/2989411 作为参考):
这些函数会原子性地释放互斥锁并使调用线程在条件变量cond上阻塞;此处的“原子性”意味着“相对于另一个线程访问互斥锁和条件变量而言是原子的”。也就是说,如果另一个线程能够在即将阻塞的线程释放互斥锁后获得它,则该线程中随后对pthread_cond_broadcast()或pthread_cond_signal()的调用将表现为在即将阻塞的线程被阻塞之后发出的调用。
这是标准关于操作顺序的唯一保证。授予锁给其他线程的顺序相当不可预测,并且它会根据一些非常微妙的时间波动而改变。
请针对仅涉及互斥锁的代码,请尝试使用以下代码:
#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *th(void *arg) {
int i;
char *s = arg;
for (i = 0; i < 10; ++i) {
pthread_mutex_lock(&mutex);
printf("%s %d\n", s, i);
pthread_mutex_unlock(&mutex);
#if 0
pthread_yield();
#endif
}
return NULL;
}
int main() {
int i;
for (i = 0; i < 10; ++i) {
pthread_t t1, t2, t3;
printf("================================\n");
pthread_create(&t1, NULL, th, "t1");
pthread_create(&t2, NULL, th, " t2");
pthread_create(&t3, NULL, th, " t3");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
}
return 0;
}
在一台机器上(单CPU),它总是从t3开始显示整个循环,然后是t2,最后是t1。在另一台机器上(2个核心),线程的顺序更随机,但几乎总是在将互斥锁授予其他线程之前为每个线程显示整个循环。很少会出现这样的情况:
t1 8
t1 9
t3 0
t2 0
t2 1
[removed other t2 output]
t2 8
t2 9
t3 1
t3 2
通过将
#if 0
替换为
#if 1
来启用
pthread_yield
,观察结果并检查输出。对我来说,它可以使两个线程交替显示其输出,然后第三个线程最终有机会工作。添加另一个或更多的线程。玩一下sleep等。它证实了随机行为。
如果您想进行一些实验,请编译并运行以下代码片段。这是单生产者-多消费者模型的示例。它可以使用两个参数运行:第一个是消费者线程的数量,第二个是要处理的数据系列的长度。如果没有给出参数,则有一个消费者线程和120个项目要处理。我还建议在标记为
/* play here */
的地方使用sleep/usleep:更改参数值,完全删除sleep,在关键部分移动它(如果适当),或者用pthread_yield替换并观察行为的变化。
#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
struct data_t {
int seq;
int payload;
struct data_t *next;
};
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;
void push(int seq, int payload) {
struct data_t *e;
e = malloc(sizeof(struct data_t));
e->seq = seq;
e->payload = payload;
e->next = NULL;
if (last == NULL) {
assert(first == NULL);
first = last = e;
} else {
last->next = e;
last = e;
}
}
struct data_t pop() {
struct data_t res = {0};
if (first == NULL) {
res.seq = -1;
} else {
res.seq = first->seq;
res.payload = first->payload;
first = first->next;
if (first == NULL) {
last = NULL;
}
}
return res;
}
void *producer(void *arg __attribute__((unused))) {
int i;
printf("producer created\n");
for (i = 0; i < num_data; ++i) {
int val;
sleep(1);
pthread_mutex_lock(&mutex);
val = rand() / (INT_MAX / 1000);
push(i, val);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
printf("prod %3d %3d signaled\n", i, val);
}
in_progress = 0;
printf("prod end\n");
pthread_cond_broadcast(&cond);
printf("prod end signaled\n");
return NULL;
}
void *consumer(void *arg) {
char c_id[1024];
int t_id = *(int *)arg;
sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
printf("%s created\n", c_id);
while (1) {
struct data_t item;
pthread_mutex_lock(&mutex);
item = pop();
while (item.seq == -1 && in_progress) {
printf("%s waits for data\n", c_id);
pthread_cond_wait(&cond, &mutex);
printf("%s got signal\n", c_id);
item = pop();
}
if (!in_progress && item.seq == -1) {
printf("%s detected end of data.\n", c_id);
pthread_mutex_unlock(&mutex);
break;
}
pthread_mutex_unlock(&mutex);
printf("%s processing %3d %3d\n", c_id, item.seq, item.payload);
sleep(item.payload % 10);
printf("%s processed %3d %3d\n", c_id, item.seq, item.payload);
}
printf("%s end\n", c_id);
return NULL;
}
int main(int argc, char *argv[]) {
int num_cons = 1;
pthread_t t_prod;
pthread_t *t_cons;
int i;
int *nums;
if (argc > 1) {
num_cons = atoi(argv[1]);
if (num_cons == 0) {
num_cons = 1;
}
if (num_cons > 99) {
num_cons = 99;
}
}
if (argc > 2) {
num_data = atoi(argv[2]);
if (num_data < 10) {
num_data = 10;
}
if (num_data > 600) {
num_data = 600;
}
}
printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data);
t_cons = malloc(sizeof(pthread_t) * num_cons);
nums = malloc(sizeof(int) * num_cons);
if (!t_cons || !nums) {
printf("Out of memory!\n");
exit(1);
}
srand(time(NULL));
pthread_create(&t_prod, NULL, producer, NULL);
for (i = 0; i < num_cons; ++i) {
nums[i] = i + 1;
usleep(100000);
pthread_create(t_cons + i, NULL, consumer, nums + i);
}
pthread_join(t_prod, NULL);
for (i = 0; i < num_cons; ++i) {
pthread_join(t_cons[i], NULL);
}
free(nums);
free(t_cons);
return 0;
}
我希望我已经解除了你的疑惑,并为你提供了一些代码来实验和增强对pthread行为的信心。