在pthread中,唤醒单个线程而不是忙等待

4
我不确定标题是否能准确反映我的问题,但这是我能做到的最好的了。我正在尝试在pthreads中实现一个“工作线程”模型。我想要从“main”函数生成一组线程,然后“main”线程将任务委派给工作线程并等待所有线程完成后再分配下一个任务(实际上,要求像CUDA编程模型一样在CPU上排列线程块,但与当前问题无关)。"job"数组用于指示每个线程的工作类型。目前,我使用信号量来实现这一点,但会导致繁忙等待。我正在寻找方法使线程在不断轮询的情况下仅在需要时进入睡眠并唤醒。
每个线程执行的函数
volatile int jobs[MAX_THREADS]; // global job indicator array
sem_t semaphore;                // semaphore to indicate completion
thread_execute(void *args)
{
  tid = get_id(args);
  while(jobs[tid] != -1)
  {
    if(jobs[tid] == 0) continue; // no job
    if(jobs[tid] == JOBS_1)
    {
      jobs1();
      jobs[tid] = 0; // go back to idle state
      sem_post(&semapahore);
    }
    if(jobs[tid] == JOBS_2)
    {
      jobs2();
      jobs[tid] = 0; // go back to idle state
      sem_post(&semapahore);
    }
  }

  pthread_exit(NULL);
}

主要功能如下。
int main()
{
  sem_init(&semaphore, 0, 0);
  jobs[0...MAX_THREADS] = 0;
  spawn_threads();

  // Dispatch first job
  jobs[0...MAX_THREADS] = JOBS_1;
  int semvalue = 0;
  while (semvalue < MAX_THREADS) // Wait till all threads increment the semaphore
    sem_getvalue(&sempaphore, &semvalue);

  sem_init(&semaphore, 0, 0); // Init semaphore back to 0 for the next job
                              // I'm actually using diff. semaphores for diff. jobs
  jobs[0...MAX_THREADS] = JOBS_2;
  while (semvalue < MAX_THREADS)
    sem_getvalue(&sempaphore, &semvalue);

  jobs[0...MAX_THREADS] = -1; // No more jobs
  pthread_join();
}

这种实现的问题在于main线程一直忙于等待所有工作线程完成,并且工作线程也不断轮询任务数组以检查是否有新任务。当线程需要休眠并在需要时唤醒时,是否有更好的方法,类似于信号处理程序和使用pthread_kill(),但这种方法有点混乱,需要单独的信号处理程序。

信号量是阻塞的 - 不应该有忙等待!乍一看,这似乎像是线程微管理 :(( - Martin James
你能否摆脱作业数组,只使用生产者-消费者队列作业对象到线程池 - 这是通常处理此类问题的方式。作业类可以包含一个枚举来告诉线程要做什么(枚举Ecommand {Ejob1,Ejob2,Eterminate})。然后您只需要一个信号量(用于线程等待和计算队列项)和一个互斥锁(用于保护队列免受多次访问)。 - Martin James
我确实需要微观管理线程,而典型的生产者消费者模型并不适用于此目的,但是访问作业队列的基本思想在这里同样有效。@Tudor 给出的代码正是我所寻找的。谢谢! - user1135552
1个回答

5
你可以使用一个条件变量,让线程等待直到收到信号。 查看更多关于条件变量的信息
volatile int jobs[MAX_THREADS]; // global job indicator array
pthread_cond_t th_cond;     // threads wait on this
pthread_mutex_t th_mutex;   // mutex to protect the signal
int busyThreads = MAX_THREADS;

pthread_cond_t m_cond;      // main thread waits on this
pthread_mutex_t m_mutex;    // mutex to protect main signal

thread_execute(void *args)
{
  tid = get_id(args);
  while(jobs[tid] != -1)
  {
    if(jobs[tid] == 0) continue; // no job
    if(jobs[tid] == JOBS_1)
    {
      jobs1();
      jobs[tid] = 0; // go back to idle state
      pthread_mutex_lock(&th_mutex);      
          pthread_mutex_lock(&m_mutex);   
          --busyThreads;                       // one less worker
          pthread_cond_signal(&m_cond);        // signal main to check progress
          pthread_mutex_unlock(&m_mutex);
      pthread_cond_wait(&th_cond, &th_mutex);   // wait for next job
      pthread_mutex_unlock(&th_mutex);      
    }
    if(jobs[tid] == JOBS_2)
    {
      jobs2();
      jobs[tid] = 0; // go back to idle state
      pthread_mutex_lock(&th_mutex);
      --busyThreads;
      pthread_cond_wait(&th_cond, &th_mutex);
      pthread_mutex_unlock(&th_mutex);
    }
  }

  pthread_exit(NULL);
}

然后在主函数中:

int main()
{
  sem_init(&semaphore, 0, 0);
  jobs[0...MAX_THREADS] = 0;
  spawn_threads();

  // Dispatch first job
  jobs[0...MAX_THREADS] = JOBS_1;
  int semvalue = 0;

  pthread_mutex_lock(&m_mutex);
  while(busyThreads > 0)        // check number of active workers
      pthread_cond_wait(&m_cond, &m_mutex);   
  pthread_mutex_unlock(&m_mutex);

  busyThreads = MAX_THREADS;
  pthread_mutex_lock(&th_mutex);
  pthread_cond_broadcast(&th_cond);   // signal all workers to resume
  pthread_mutex_unlock(&th_mutex);

  // same for JOBS_2;

  jobs[0...MAX_THREADS] = -1; // No more jobs
  pthread_join();
}

谢谢!这比忙等好多了 :) - user1135552
@user1135552:你测试过这个代码了吗?我是凭空想出来的,所以不确定它是否百分之百正确。 - Tudor
我使用了你的代码,@Tudor,修改了我的实现。它完美地工作了,现在比繁忙等待实现更快地找到了模式,速度提高了30%!非常感谢 :) - user1135552

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接