父线程不等待工作线程完成任务

4

在我的程序中,主线程创建了4个(或更多)工作线程。在某个时刻,父线程必须等待工作线程完成一些计算。这些线程在无限循环中运行,因此我不能使用pthread_join(.., ..) POSIX函数来等待工作线程完成。所以我使用了一个全局计数器和一个条件变量。

主线程代码

unsigned jobs = 0; // global variable

// global mutex and cv. They get initialised in my main.
pthread_mutex_t counter_mutex;
pthread_cond_t  counter_cv;

static void process(..){

    jobs = myArray.size(); 

    // I am using a function here that broadcasts a cv in order to
    // wake up the workers

    pthread_mutex_lock(&counter_mutex); // lock counter
        while (jobs > 0){
            pthread_cond_wait(&counter_cv, &counter_mutex); // PARENT SHOULD GET STUCK HERE TILL WORKERS ARE DONE
        }
        // cout << "Workers are done" << endl;
    pthread_mutex_unlock(&counter_mutex);   // unlock counter
}

工作线程代码

 extern unsigned jobs;
 extern pthread_mutex_t counter_mutex;
 extern pthread_cond_t  counter_cv;

 void *run() {

    for (int i = 0;; i++) {

        // do some calculations here

        pthread_mutex_lock(&counter_mutex); // lock counter
            jobs--;
            if (jobs == 0){
                pthread_cond_signal(&counter_cv);
                cout << "All jobs are done" << endl;
            }
        pthread_mutex_unlock(&counter_mutex);   // unlock counter

    }
}    

问题在于有时我的主线程不会被阻塞在条件变量上等待工作线程,这导致有时会出现分段错误。是否存在我看不到的竞争条件导致了这个问题?


jobs = myArray.size(); 这是队列/数组中工人弹出并计算的项目数量。 - pirox22
线程始终保持活动状态,但会在另一个条件变量处被阻塞,直到主线程将项目推送到队列/数组。 - pirox22
无论主线程是否等待,直到jobs为0(或更少)之前,它都不应绕过while循环。您应该在任何地方记录jobs的值,并在更改jobs之前和之后查看输出。也许这将有助于缩小问题范围。 - Kevin
你的工作线程比任务还多吗? - Kevin
3个工作,4个线程。我唤醒它们全部。3个工作线程从队列中弹出。第四个弹出什么?然后第三个将工作计数器减少到0,然后发出信号给主线程,第四个导致sigsegv。 - pirox22
显示剩余5条评论
2个回答

0

我看到你的代码存在一个问题,那就是每个工作程序都会不停地运行(如果你的处理过程不涉及计时器/IO操作,有可能会占用100%的CPU),并且一旦没有更多的任务后也不会终止。

在实际情况中,你的工作程序应该从队列或类似的地方获取到工作请求,并在没有可处理的工作时阻塞(等待可用的工作或某个终止命令)。

我没有看到这里存在竞争条件。唯一的问题是作业计数器的系统性递减。如果你认为它不可能成为负数,那么很快它就会变成负数,这可能会引起各种麻烦。为了避免这种情况,只需要在递减之前将计数器测试为零即可。


0

你的代码存在明显的并发问题。为了简单起见,假设我们有两个工人和两个任务。以下情况可能发生:

  • 任务计数设置为2
  • 父进程被卡在等待条件变量上
  • 两个工人在他们的无限循环中开始处理迭代
  • 第一个工人由于某种原因更快地完成了任务,并执行了锁定的作业计数更新。作业计数减少到1,因此父进程被留下等待,锁被释放。然后工人开始下一次迭代。不幸的是,在现实中没有剩余的工作(1已经完成,1正在由第二个工人执行)。但它开始了一个假想的工作(也许是在同时删除或超出范围的数组元素中寻址?)
  • 第二个工人完成了它的工作,作业计数减少到0,导致父进程被唤醒,持有锁,退出循环并解锁。
  • 你注意到,虽然父进程认为它已经结束了,但第一个工人仍在处理一个假想的任务,可能尝试访问myArray,而它已经被清理,或者其他任何可能出错的事情。

所以你有两个机会进行比赛和/或segfalts:在父进程中和仍然活跃的工作进程中,忙于不存在的任务。

我认为如果您开始循环获取锁定、检查是否仍有剩余作业并预先减少作业计数,那么工作循环将会更加安全,这样同行工作者就会知道实际剩余的工作量:

void *run() {

    for (int i = 0;; i++) {
        pthread_mutex_lock(&counter_mutex); // lock counter
            if (jobs == 0){
                pthread_cond_signal(&counter_cv);
                cout << "All jobs are done" << endl;
            }
            else jobs--;
        pthread_mutex_unlock(&counter_mutex);   // unlock counter

        // do some calculations here

    }
}   

优点是工人只在确实还有工作的情况下才工作。唯一的问题是,当第一个工人失业时,父进程会被唤醒。但其他工人仍然可能在运行。
如果这是一个问题,你可以维护一个仍在运行的任务计数器,并让父进程循环执行“(jobs>0 || active_jobs>0)”代码。

代码中有一部分我没有在帖子中添加,其中工作线程从队列/数组中弹出时会检查其大小。因此,如果队列为空,则无法弹出。因此,当第一个线程弹出时,大小现在为1,但作业尚未减少。第二个线程弹出,现在大小为0。第一个线程结束得更快,但将作业减少到1,但它不会再次弹出,因为队列的大小为0。我认为我的问题是由程序的另一部分中条件变量上的“虚假唤醒”引起的。我进行了编辑,看起来像是可以工作了。谢谢。 - pirox22
@pirox22,队列的弹出和大小检查是否受到互斥锁的保护? - Christophe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接