如何确保线程被阻塞?

3

我有一个多线程的C语言基准测试程序,可以描述如下:

Thread 1   Thread 2   Thread 3       Control thread

while(1)   while(1)    while(1)       while(1)
   |          |          |             
   |          |          |                |             
   |          |          |            every one second: 
   |          |          |               wait for other threads to be blocked
   |          |          |               do something with S values
   |          |          |                |             
   |          |          |                |             
 write S1    write S2   write S3          |
   |          |          |                |          
   |          |          |                |
 barrier     barrier   barrier         barrier

我的问题涉及上图中的等待其他线程被阻塞语句。目前,我想到了以下解决方案来实现它:

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <inttypes.h>

#define NB_THREADS 11

pthread_barrier_t b;
uint8_t blocked_flags[NB_THREADS] = {0};
pthread_mutex_t blocked_flags_mutexes[NB_THREADS];
uint64_t states[NB_THREADS] = {0};

uint64_t time_diff_get(struct timespec *start, struct timespec *end) {
  uint64_t end_ns = end->tv_sec * 1E9 + end->tv_nsec;
  uint64_t start_ns = start->tv_sec * 1E9 + start->tv_nsec;
  uint64_t res = end_ns - start_ns;
  return res;
}

static void *worker_thread(void *arg) {
  uint8_t id = *((uint8_t *)arg);
  int a =  0;
  while(1) {
    for (int i = 0; i < 1000; i++) {
      a++;
    }
    states[id]++;
    pthread_mutex_lock(&blocked_flags_mutexes[id]);
    blocked_flags[id] = 1;
    pthread_mutex_unlock(&blocked_flags_mutexes[id]);
    pthread_barrier_wait(&b);
    pthread_mutex_lock(&blocked_flags_mutexes[id]);
    blocked_flags[id] = 0;
    pthread_mutex_unlock(&blocked_flags_mutexes[id]);
  }
  printf ("a = %d\n", a);
  return NULL;
}

static void *control_thread() {

  struct timespec last_time;
  clock_gettime(CLOCK_REALTIME, &last_time);

  while(1) {

    struct timespec time;
    clock_gettime(CLOCK_REALTIME, &time);
    if (time_diff_get(&last_time, &time) >= 1E9) {

      // Wait for all threads to be blocked
      for (int i = 0; i < NB_THREADS; i++) {
        while (1) {
          pthread_mutex_lock(&blocked_flags_mutexes[i]);
          if (blocked_flags[i] == 1) {
            pthread_mutex_unlock(&blocked_flags_mutexes[i]);
            break;
          }
          pthread_mutex_unlock(&blocked_flags_mutexes[i]);
        }
      }
      for (int i = 0; i < NB_THREADS; i++) {
        pthread_mutex_lock(&blocked_flags_mutexes[i]);
        if (blocked_flags[i] == 0) {
          printf("How could I avoid to be there ??\n");
          exit(-1);
        }
        pthread_mutex_unlock(&blocked_flags_mutexes[i]);
      }

      // Do some intersting stuff here with states array
      // .....
      // .....

      // Save last time
      clock_gettime(CLOCK_REALTIME, &last_time);
    }

    pthread_barrier_wait(&b);
  }
  return NULL;
}

int main() {

  // Init barrier
  pthread_barrier_init(&b, NULL, NB_THREADS + 1);

  // Create worker threads
  pthread_t threads[NB_THREADS];
  uint8_t ids[NB_THREADS];
  for (int i = 0; i < NB_THREADS; i++) {
    ids[i] = i;
    pthread_mutex_init(&blocked_flags_mutexes[i], NULL);
  }
  for (int i = 0; i < NB_THREADS; i++) {
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);
    CPU_SET(i + 1, &cpu_set);
    pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpu_set);
    pthread_create(&threads[i], &attr, worker_thread, &ids[i]);
  }

  // Create control thread
  pthread_t ctrl_thread;
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  cpu_set_t cpu_set;
  CPU_ZERO(&cpu_set);
  CPU_SET(0, &cpu_set);
  pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpu_set);
  pthread_create(&ctrl_thread, &attr, control_thread, NULL);

  // Join on worker threads
  for (int i = 0; i < NB_THREADS; i++) {
    pthread_join(threads[i], NULL);
  }

  return 0;
}

但是,使用gcc -O0编译的此基准测试在12核Intel平台上运行时,明显显示出我有一个"竞争"问题,因为进程总是在几秒钟后退出并显示消息。我该如何解决?

注意:根据其他问题,我考虑使用自定义屏障,但我需要继续使用pthread_barrier而不是在mutex和cond变量之上重新实现的barrier。


1
你应该真正检查你调用的函数的返回值,这样你就知道它们是否成功了。 - Robert Allan Hennigan Leahy
1
一个计数信号量可能比多个互斥锁更有效。 - Fiddling Bits
3个回答

1
你的代码存在明显的竞态条件。当线程被栅栏等待解除阻塞时,它们会将标志重置为零。在这之前,它们的标志仍然保持为1一段时间。控制线程可能会观察到这个过期的值为1,并认为相应的线程已经准备好阻塞,而实际上该线程只是刚刚从栅栏等待中出来并准备清除标志:
// worker thread
pthread_barrier_wait(&b);
// No longer blocked, but blocked_flags[id] is still 1.
// At this point, the control thread grabs the mutex, and observes the 1 value
// The mistake is thinking that 1 means "I'm about to block"; it actually
// means, "I'm either about to block on the barrier, or have just finished".
pthread_mutex_lock(&blocked_flags_mutexes[id]);
blocked_flags[id] = 0;
pthread_mutex_unlock(&blocked_flags_mutexes[id]);

这种竞争条件足以有时欺骗控制线程,使其通过第一个循环。然后它陷入第二个循环,在那里发现并非所有标志都为零。
你的问题的本质是,一群线程执行一些重复的、循环的并行处理,由屏障控制。然而,你只在循环中使用了单个屏障等待,这意味着循环只有一个阶段。然而,从语义上讲,你的循环分为两个阶段:线程被阻塞和未被阻塞。你构建的区分这些阶段的机制不是线程安全的;显而易见的解决方案是再次使用屏障将循环分成更多的阶段。
POSIX屏障具有“串行线程”功能:其中一个等待线程被告知它是特殊的。这使得你可以实现特殊阶段,在这些阶段中,只有串行线程执行一些重要的操作,其他线程可以做其他事情,比如调用屏障等待跳转到下一个阶段。这应该消除实现hack的需要,比如通过这种方式使用标志来猜测其他线程何时变得静止。

注意:在POSIX屏障等待中,您无法选择哪个线程是串行线程,因此您不能为该操作专门创建控制线程。相反,您只使用N个线程而不是N+1个线程。它们都执行相同的操作,当它们到达屏障时,可以告诉其中任何一个线程它是串行线程。基于此,串行线程将执行与其他线程不同的某些替代代码。

因此,现在是图表时间:

while(1)   while(1)    while(1)       
   |          |          |             
   |          |          |          
   |          |          | 
   |          |          |   <---- WRITE PHASE  
   |          |          |  
   |          |          |             
   |          |          |                 
 write S1    write S2   write S3
   |          |          |           
   |          |          |      
 barrier     barrier   barrier 
   |          |          |        
   |          |          |     <--- CHECK PHASE
   |          |          |           
   |          |     serial thread!   
   |          |          |           
   |          |       next second?-- YES -> do something with S values!
   |          |          |  NO        |
   |          |          |            |
   |          |          +------------+ 
   |          |          | 
 barrier     barrier   barrier
   |          |          | 
   |          |          | 

back to top, next WRITE PHASE.

在“检查阶段”中,串行线程(可以是N个线程中的任何一个)执行检查:自上次时间转换到下一秒以来,时间是否已经转换到下一秒?如果是,则对S值进行操作。
“屏障”确保其他线程不会触及“检查阶段”中的值,因此串行线程无需使用互斥锁来处理S值!您已经通过每个循环中的额外屏障调用付出了这种同步的代价。
您可以拥有一个额外的线程提供时间基础:它的工作是休眠直到下一秒到达,然后增加一个计数器。串行线程只需要检查该计数器是否已经增加(相对于其旧值,在另一个变量中存储)。然后执行动作并更新旧计数器以匹配新计数器。这样,您就不必在主处理循环中调用操作系统获取当前时间。

0

不要为每个工作线程保留一个标志,而是可以对单个计数器进行互斥保护,当每个工作线程即将被阻塞时,它可以增加此计数器,并在障碍释放后减少它。这可以避免你等待第一个线程被阻塞,然后等待第二个线程,然后是第三个线程等等。

我没有看到你的控制线程如何退出(除了在意外情况下),主线程似乎也没有等待它。

也许你还想在工作线程之前创建控制线程。

你可能还想通过让工作线程和控制线程等待障碍来同步它们,在被释放并开始实际工作之前。


0

我认为可能发生的情况是:

  • 在control_thread()的第一次执行while(1)时,time_diff_get(&last_time, &time)返回一个小于1E9的值,因此线程直接进入屏障
  • 现在所有工作线程最终都会进入屏障
  • 在这种情况发生后,control_thread()执行它的循环第二次,并立即检查blocked_flags[i]
  • 如果在某个线程重置其标志之前至少有一个线程发生这种情况,则会出现您所期望的行为。

很抱歉我目前无法提供解决方案,但如果我正确理解了问题,那么解决问题已经迈出了良好的一步。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接