pthread广播然后等待?

5
我正在尝试设置几个线程,使其处于等待状态,直到它们收到pthread_cond_broadcast()
完成作业后,我希望线程返回到它们的等待状态。
我还希望调用pthread_cond_broadcast()的进程在继续之前等待所有线程返回到它们的等待状态。 在这种情况下,调用广播的是主函数。 我正在尝试让main()在调用广播后执行pthread_cond_wait()来实现此目标。
void* Work::job(void* id)
{
    int idx = (long)id;

    while(1)
    {
        pthread_mutex_lock(&job_lock);

        while(!jobs_complete)
        {
            // wait for main to broadcast
            pthread_cond_wait(&can_work, &job_lock);
            pthread_mutex_unlock(&job_lock);

            // work here

            pthread_mutex_lock(&job_lock);
            ++jobs_completed;

            if(jobs_completed == NUM_THREADS)
            {
                jobs_complete = true;
                pthread_cond_signal(&jobs_done);
                pthread_mutex_unlock(&job_lock);
            }
            pthread_mutex_unlock(&job_lock);
        }

        pthread_mutex_unlock(&job_lock);
    }

    return NULL;
}

NUM_THREADS等于4,job_lock是一个pthread_mutex_tcan_workjobs_donepthread_cond_tjobs_completed是一个bool类型,而jobs_complete是一个int类型。

// work

jobs_completed = false;
jobs_complete = 0;
pthread_mutex_lock(&job_lock);
pthread_cond_broadcast(&can_work);
pthread_cond_wait(&jobs_complete);
pthread_mutex_unlock(&job_lock);

// work that depends on jobs_complete

目前,我正在通过调用pthread_cond_broadcast(),然后紧接着调用pthread_cond_wait() 来执行此操作,但似乎会出现死锁。

请问有人能解释我应该如何执行此操作,或者我错在了哪里吗?我会非常感激任何帮助。

谢谢!


看起来你正在尝试设置一个线程堆栈,让它们都等待“go”信号,然后它们集体完成NUM_THREAD个作业,理想情况下每个线程完成一个作业。一旦NUM_THREAD个作业完成,就回到等待另一个go信号的状态。这样说准确吗?我已经发现了至少五个错误,但在制定答案之前,想要确切知道你想做什么。 - WhozCraig
@WhozCraig,是的,这就是我想做的。我还希望调用“go”信号的函数在继续之前等待所有作业完成。 - noko
2个回答

4

我只是发布这个(几乎全部都是C代码,但pthread也是如此,所以请宽容一点)来演示实现我认为您正在尝试完成的任务的一种方法。显然,您希望正确地将大部分内容封装在适当的类中等等。希望这个示例能够向您展示条件变量、互斥锁及其与“谓词管理和通知”之间的关系。

希望您会发现它有用。祝您拥有愉快的一天。

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;

// our global condition variable and mutex
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

// our predicate values.
bool finished = false;
int jobs_waiting = 0;
int jobs_completed = 0;

// our thread proc
static void *worker_proc(void* p)
{
    intptr_t id = (intptr_t)p;  // our id
    size_t n_completed = 0;     // our job completion count

    // always latch prior to eval'ing predicate vars.
    pthread_mutex_lock(&mtx);
    while (!finished)
    {
        // wait for finish or work-waiting predicate
        while (!finished && jobs_waiting == 0)
            pthread_cond_wait(&cv, &mtx);

        // we own the mutex, so we're free to look at, modify
        //  etc. the values(s) that we're using for our predicate
        if (finished)
            break;

        // must be a job_waiting, reduce that number by one, then
        //  unlock the mutex and start our work. Note that we're
        //  changing the predicate (jobs_waiting is part of it) and
        //  we therefore need to let anyone that is monitoring know.
        --jobs_waiting;
        pthread_cond_broadcast(&cv);
        pthread_mutex_unlock(&mtx);

        // DO WORK HERE (this just runs a lame summation)
        for (int i=0,x=0;i<1048576; x += ++i);
        ++n_completed;

        // finished work latch mutex and setup changes
        pthread_mutex_lock(&mtx);
        ++jobs_completed;
        pthread_cond_broadcast(&cv);
    }

    // final report
    cout << id << ": jobs completed = " << n_completed << endl;

    // we always exit owning the mutex, so unlock it now. but
    //  let anyone else know they should be quitting as well.
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    return p;
}

// sets up a batch of work and waits for it to finish.
void run_batch(int num)
{
    pthread_mutex_lock(&mtx);
    jobs_waiting = num;
    jobs_completed = 0;
    pthread_cond_broadcast(&cv);

    // wait or all jobs to complete.
    while (jobs_completed != num)
        pthread_cond_wait(&cv, &mtx);

    // we own this coming out, so let it go.
    pthread_mutex_unlock(&mtx);
}

// main entry point.
int main()
{
    // number of threads in our crew
    static const size_t N = 7;
    pthread_t thrds[N] = {0};

    // startup thread crew.
    intptr_t id = 0;
    for (size_t i=0; i<N; ++i)
        pthread_create(thrds + i, NULL, worker_proc, (void*)(++id));

    // run through batches. each batch is one larger
    //  than the prior batch. this should result in some
    //  interesting job-counts per-thread.
    for (int i=0; i<64; ++i)
        run_batch(i);

    // flag for shutdown state.
    pthread_mutex_lock(&mtx);
    finished = true;
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    for (size_t i=0; i<N; pthread_join(thrds[i++], NULL));

    return 0;
}

样例输出 #1

3: jobs completed = 256
6: jobs completed = 282
5: jobs completed = 292
2: jobs completed = 242
1: jobs completed = 339
4: jobs completed = 260
7: jobs completed = 409

样例输出 #2

6: jobs completed = 882
1: jobs completed = 210
4: jobs completed = 179
5: jobs completed = 178
2: jobs completed = 187
7: jobs completed = 186
3: jobs completed = 194

示例输出 #3

1: jobs completed = 268
6: jobs completed = 559
3: jobs completed = 279
5: jobs completed = 270
2: jobs completed = 164
4: jobs completed = 317
7: jobs completed = 159

固定批量大小

相同的代码,但是更改以下内容:

for (int i=0; i<64; ++i)
    run_batch(i);

转换为:

for (int i=0; i<64; ++i)
    run_batch(N);

以下是可能更接近您真正需要的内容:

给出以下内容:

样例输出 #1

4: jobs completed = 65
2: jobs completed = 63
5: jobs completed = 66
3: jobs completed = 63
1: jobs completed = 64
7: jobs completed = 63
6: jobs completed = 64

样例输出 #2

3: jobs completed = 65
5: jobs completed = 62
1: jobs completed = 67
7: jobs completed = 63
2: jobs completed = 65
6: jobs completed = 61
4: jobs completed = 65

样例输出 #3

2: jobs completed = 58
4: jobs completed = 61
5: jobs completed = 69
7: jobs completed = 68
3: jobs completed = 61
1: jobs completed = 64
6: jobs completed = 67

1
你的函数末尾有3个可能连续调用pthread_mutex_unlock,这将导致未定义的行为。实际上,你不需要其中的两个。如果jobs_completetrue,线程将退出循环并释放锁,否则它将循环并需要等待can_work条件时使用锁。此外,还有其他内容。
 pthread_cond_wait(&jobs_complete);

您可能的意思是:

pthread_cond_wait(&jobs_complete,&job_lock);

此外,该函数期望一个 pthread_cond_t * 和一个 pthread_mutex_t *,而不是一个 int,因此即使那段代码在这种情况下也明显存在问题。
请注意,条件变量上的信号或广播只会影响已经等待变量的线程。信号不会保留以供将来等待使用。 因此,当线程在 jobs_complete 上循环时被阻塞并再次等待时,它们必须再次被发出信号才能恢复工作。
另一件事:您提到 job_complete 的类型为 int,而 job_completed 的类型为 bool,但您的代码似乎并不一致:
        if(jobs_completed == NUM_THREADS)
        {
            jobs_complete = true;

这是我的建议:学习信号量和屏障的抽象模型,如果可以的话,使用现有的实现(C++11中的booststd),否则使用pthread API重新实现它们。这将比操作条件变量更容易处理情况。在这个网站上查找现有的解决方案。例如这个问题处理一个非常类似的问题,我提供的解决方案可以很容易地修改为使用pthread API来满足您的要求。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接