为什么有些线程无法收到pthread_cond_broadcast信号?

3

我有一个工作线程池。每个工作线程执行以下例程:

void* worker(void* args){
  ...
  pthread_mutex_lock(&mtx);

  while (queue == NULL && stop == 0){
    pthread_cond_wait(&cond, &mtx);
  }

  el = pop(queue);
  pthread_mutex_unlock(&mtx);

  ...
}

主线程:

int main(){

   ...
   while (stop == 0){
     ...
     pthread_mutex_lock(&mtx);  
     insert(queue, el);
     pthread_cond_signal(&cond);
     pthread_mutex_unlock(&mtx);
     ...
   }
...
}

接下来我有一个信号处理程序,当它收到一个信号时,会执行以下代码:

void exit_handler(){
    stop = 1;   
    pthread_mutex_lock(&mtx);
    pthread_cond_broadcast(&cond);
    pthread_mutex_unlock(&mtx); 
}

我省略了声明和初始化过程,但原始代码包含这些内容。

大多数情况下,在接收到信号后一切都正常,但有时似乎一些工作线程会停留在等待循环中,因为它们没有看到变量stop的更改和/或它们没有被广播唤醒。

所以线程永远不会结束。 我遗漏了什么?

编辑:在exit_handler中,将stop=1移至关键部分。问题仍然存在。

编辑2:我在Ubuntu的VM上执行程序。由于代码似乎完全正确,我尝试更改VM和OS(XUbuntu),现在似乎可以正常工作。仍然不知道原因,有人有想法吗?


stop = 1 应该在互斥保护的区域内执行。这是一条注释,因为我看不出它如何引起你所看到的问题。 - caf
我尝试过了,但问题仍然存在。我正在虚拟机上运行程序,这可能会导致一些奇怪的问题吗?因为实际上,代码似乎是正确的。 - Nymeria
你能提供一个 MCVE 吗?正如所指出的,我的答案是错误的,但我对这个问题很感兴趣,因为我正在处理类似的事情(除了我使用的是 phtread_cond_signal 而不是 pthread_cond_broadcast)。我编写了一些测试代码,我认为它做了你描述的事情,但我不认为我看到了你的问题。 - yano
1个回答

2
我猜测您可能对pthread_cond_broadcast的工作方式有误解(至少是我过去遇到的问题之一,但不确定是否适用于您的情况)。来自man page的描述如下:

pthread_cond_broadcast()函数将解除所有当前阻塞在指定条件变量cond上的线程。

好的,这很有道理,_broadcast唤醒了所有当前阻塞在cond上的线程。然而,只有一个被唤醒的线程能够在它们全部唤醒后锁定互斥锁。同样来自man page的描述:

已解除阻塞的线程应根据调度策略(如果适用)争夺互斥锁,并且就像每个线程都调用了pthread_mutex_lock()一样。

这意味着如果有3个线程被阻塞在cond上并且调用了_broadcast,那么所有3个线程都会被唤醒,但只有1个线程能够获取互斥锁。其他2个线程仍然被阻塞在pthread_cond_wait上,等待信号。由于这个原因,它们没有看到stop被设置为1,并且exit_handler(我假设是一个Ctrl+c软件信号?)已经完成了信号处理,所以输掉了_broadcast竞赛的剩余线程陷入了僵局,等待永远不会到来的信号,并且无法读取已经设置的stop标志。
我认为有两个解决方法:
  1. 使用pthread_cond_timedwait。即使没有被信号唤醒,也将在指定的时间间隔内从等待中返回,查看stop == 1,然后退出。
  2. 在您的worker函数末尾添加pthread_cond_signalpthread_cond_broadcast。这样,在线程退出之前,它将发出cond变量的信号,允许任何其他等待线程获取互斥锁并完成处理。如果没有线程在等待条件变量,发出条件变量信号也没有问题,因此即使是最后一个线程,这也应该没问题。

编辑: 这里有一个最小可复现示例,证明了我上面的答案是错误的。当我按下Ctrl + c时,程序会立即退出,这表明所有线程在广播后很快获取了互斥锁,看到 stop 为false,然后退出。然后main加入线程并结束进程。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
#include <signal.h>
#include <unistd.h>


#define NUM_THREADS 3
#define STACK_SIZE 10

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
volatile bool stop = false;
int stack[STACK_SIZE] = { 0 };
int sp = 0; // stack pointer,, also doubles as the current stack size

void SigHandler(int sig)
{
  if (sig == SIGINT)
  {
    stop = true;
  }
  else
  {
    printf("Received unexcepted signal %d\n", sig);
  }
}

void* worker(void* param)
{
  long tid = (long)(param);
  while (stop == false)
  {
    // acquire the lock
    pthread_mutex_lock(&m);
    while (sp <= 0)  // sp should never be < 0
    {
      // there is no data in the stack to consume, wait to get signaled
      // this unlocks the mutex when it is called, and locks the
      // mutex before it returns
      pthread_cond_wait(&c, &m);
    }

    // when we get here we should be guaranteed sp >= 1
    printf("thread %ld consuming stack[%d] = %d\n", tid, sp-1, stack[sp-1]);
    sp--;

    pthread_mutex_unlock(&m);

    int sleepVal = rand() % 10;
    printf("thread %ld sleeping for %d seconds...\n", tid, sleepVal);
    sleep(sleepVal);
  }
  pthread_exit(NULL);
}

int main(void)
{
  pthread_t threads[NUM_THREADS];
  pthread_attr_t attr;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  srand(time(NULL));

  for (long i=0; i<NUM_THREADS; i++)
  {
    int rc = pthread_create(&threads[i], &attr, worker, (void*)i);
    if (rc != 0)
    {
      fprintf(stderr, "Failed to create thread %ld\n", i);
    }
  }

  while (stop == false)
  {
    // produce data in bursts
    int numValsToInsert = rand() % (STACK_SIZE - sp);
    printf("main producing %d values\n", numValsToInsert);
    // acquire the lock
    pthread_mutex_lock(&m);

    for (int i=0; i<numValsToInsert; i++)
    {
      // produce values for the stack
      int val = rand() % 10000;
      // I think this should already be guaranteed..?
      if (sp+1 < STACK_SIZE)
      {
        printf("main pushing stack[%d] = %d\n", sp, val);
        stack[sp++] = val;
        // signal the workers that data is ready
        //printf("main signaling threads...\n");
        //pthread_cond_signal(&c);
      }
      else
      {
        printf("stack full!\n");
      }
    }

    pthread_mutex_unlock(&m);

    // signal the workers that data is ready
    printf("main signaling threads...\n");
    pthread_cond_broadcast(&c);  

    int sleepVal = 1;//rand() % 5;
    printf("main sleeping for %d seconds...\n", sleepVal);
    sleep(sleepVal);    
  }


  for (long i=0; i<NUM_THREADS; i++)
  {
    pthread_join(threads[i], NULL);
  }

  return 0;
}

3
这是不正确的:"另外两个线程仍将被困在pthread_cond_wait中,等待信号。" 他们不会等待信号。它们不再被阻塞在条件变量上。它们只是等待互斥锁变得可用。 - R.. GitHub STOP HELPING ICE
@R.. 嘿,这是我最不确定的部分,因为我不熟悉 pthread_cond_wait 的内部机制。那么这完全是错误的吗?如果它们不再等待信号,而是等待锁定互斥量,那么使用 _broadcast 唤醒的每个线程最终都应该获取互斥量,因为在退出之前获取它的线程将在释放它之前释放它。如果是这样,那么我的答案就没有正确地解决 OP 所描述的问题。 - yano
1
这不是内部实现,而是抽象接口。如果我没有漏掉什么,那么这个答案大体上是错误的。它们应该最终都会获取到互斥锁。如果它们没有获取到,要么是因为第一个获取它的线程从未释放它,要么是因为程序在它们都有机会获取它之前退出了。另一个可能性是,它们都在循环并再次等待condvar,因为获取互斥锁的第一个线程所做的某些事情导致谓词再次变为false。 - R.. GitHub STOP HELPING ICE
1
@yano:感谢更新。我实际上喜欢解决这些问题并改进C/POSIX接口的好信息,以便人们在搜索时能找到;这就是为什么我会评论或回答的原因。 - R.. GitHub STOP HELPING ICE
我觉得在原来错误的答案上加上删除线会很有帮助,因为在看到你的编辑之前,我一直感到困惑和担忧。所以我建议了一个编辑。如果你不想接受它,可以随意拒绝。然而,我认为在顶部放置一个编辑,说明原始答案是错误的,这样读者就不必等到阅读大部分答案才发现它是错误的,这将非常有帮助。 - mgarey
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接