C++11线程:多个线程等待条件变量

10
我目前正在解决一个模拟扩展的生产者-消费者模型的问题。在这个问题中,有3个工人和3个可用的工具。为了让工人工作,他们需要2个工具(材料不相关)。如果保险库里有2个或以上的工具,工人将取走2个。否则,他们将等待条件变量被信号通知,当保险库里有2个或以上的工具时再行动。
对于有2个工人的情况,情况还好:一个工人会工作然后把工具还回保险库,另一个等待的工人会醒来并取走两个工具。但是问题在于,对于有3个工人的情况,总会有一个工人因得不到工具而挨饿。
经过一些测试,我注意到等待条件变量的线程是以堆栈形式结构化的。是否有任何可能使之成为队列形式呢?(1等待,2等待,3等待。当1被唤醒并想要做另一个时,他必须排在2和3的后面等待。)
这是一个示例输出。代码太长了,所以如果真有必要,我会发布它。有3个工人线程和1个工具互斥体。每次运行都会有不同的工人挨饿。
1 Tools taken. Remaining: 1
2 Waiting on tools...
3 Waiting on tools...
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
...
< p>(正如您所看到的,2永远不会得到工具...)

更新:2013/07/05 我添加了一些代码。

int tools = 3; //global
string last;   //current last product on output buffer
mutex toolsMutex;
mutex matSearchMutex;

int main() {
    //Initializing Producers
    Producer prod1(1);
    Producer prod2(2);
    Producer prod3(3);

    thread p1(processor,1);
    thread p2(processor,2);
    thread p3(processor,3);

    p1.detach();
    p2.detach();
    p3.detach();

    while(true) {//forever running
    }

    return 0;
}

处理器:

//Processor method
void processor(int i) {
    srand(time(NULL)); 

    while (true) { //forever running

        bool hasTools = false;
        bool productMade = false;
        while (productMade == false) { //while product has yet to be made.
            //choose what to make...
            if (hasTools == false) {
                thread matT(getMaterials,whatToMake);
                thread toolT(getTools,i);
                toolT.join();
                matT.join();
                hasTools = true;
            }
            else { //tools acquired but no materials
                thread matT(getMaterials,whatToMake);
                matT.join();
            }

            if (recordedLast.compare(last) != 0) {
                
                //return materials and acquire new ones the next run
                
                continue;
            }
            else {
                makeProduct(whatToMake);
                unique_lock<mutex> locker(toolMutex); 
                tools = tools + 2;
                cout << i << " Operator Product made. Tools returned. Tools now:" << tools << endl;
                productMade = true;
                if (tools >=2)
                    toolsCV.notify_one();
            }
            
            //done processing
        
        }
    }
}

制作产品:

void makeProduct(int i) {
    unique_lock<mutex> mainMatLock(matSearchMutex); 
    // make product according to i
    this_thread::sleep_for(chrono::milliseconds(rand() % 1000 + 10));   
}

获取工具:

void getTools(int i) {
    unique_lock<mutex> locker(toolMutex); 
    if (tools <2) {
        cout << i << " Waiting on tools..." << endl;
        toolsCV.wait(locker);
    }
    tools = tools - 2;//tools acquired
    cout << i <<" Tools taken. Remaining: " << tools << endl;
}

感谢回复。今晚我会尝试使用多个条件变量实现等待队列。

(附注:在Stack Overflow上有更好的代码格式化方法吗?除了用四个空格...)


为什么不发布您的实际代码?您是否尝试在条件变量上进行广播(而不是信号)?您没有指定有关您的环境的任何信息,但可能需要查看http://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_08_04_01 - Tony Delroy
2
"在Stack Overflow上有更好的代码格式化方式吗?" 输入代码后,选中整个代码块,然后按Ctrl-K或按编辑框上方的**{}**按钮。 - Jonathan Wakely
4个回答

14

std::condition_variable不指定在调用notify_one时唤醒哪个等待的线程。因此,您应编写不关心唤醒哪个线程的代码。标准模式是唤醒哪个线程,哪个线程就执行需要完成的工作。

如果要求以特定顺序唤醒线程,则使用不同的机制。例如,可以为每个线程创建一个单独的std::condition_variable,并在它们需要工具时将它们放入队列中。当线程交还工具时,它可以使用相应于队列前面线程的条件变量发出信号。那个线程将被唤醒,其他线程将保持睡眠状态(除了虚假唤醒)。


3
当有多个线程等待某一个条件时,唤醒它们的顺序(使用notify_all)或者唤醒哪一个(使用notify_one)是不确定的。如果你需要某种排序方式,你需要自行实现使用notify_all的方法。你可以维护一个等待线程队列:在等待之前(但在获取互斥锁之后),将线程ID推入队列末尾。在循环中,以“队列中的这个线程和必要工具可用”为循环条件。当你获得了必要工具时,从队列前面移除该ID,然后再次调用notify_all

1

一种方法可能是使用一个完整的信号量,它在线程之间共享,而不是使用条件变量。这样,您可以等待特定数量。

#include <mutex>
#include <thread>
#include <condition_variable>

using std::mutex;
using std::condition_variable;

class Semaphore
{
public:
    /**
     * Construct a counting semaphore with an initial value
     * @param cnt The value of the initial semaphore count
     */
    Semaphore(unsigned int cnt);

    /**
     * acquire a semaphore count
     * @param numRes The number of count ressources to acquire
     */
    void acquire(unsigned int numRes = 1);

    /**
     * try to acquire a semaphore count.
     * @param numRes The number of count ressources that the method tries to acquire
     * @return true, if numRes could be aquired
     *         false, otherwise
     */
    bool tryAcquire(unsigned int numRes = 1);

    /**
     * release one semaphore cnt
     * @param numRes The number of count ressources to release
     */
    void release(unsigned int numRes = 1);

private:
    unsigned int cnt;
    mutex mut;
    condition_variable cond;
};

Implementation looks like:

void Semaphore::acquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    while (cnt < numRes)
    {
        cond.wait(lock);
    }

    cnt-=numRes;
}

bool Semaphore::tryAcquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    if (cnt>=numRes)
    {
        cnt -= numRes;
        return true;
    }
    return false;
}

void Semaphore::release(unsigned int numRes)
{
    {
        unique_lock<mutex> lock(mut);
        cnt += numRes;
    }
    // notify <numRes> waiting entities
    for (unsigned int i = 0; i<numRes; ++i)
    {
        cond.notify_one(); 
    }
}

1
这里真正的问题是,如果你有工作线程和有限的资源需求,你不应该关心哪个线程被激活,你只需要关心工作是否完成。唯一的区别在于日志记录。你定义的线程数是可以并行运行的线程数,受资源限制为一个。
如果这对你来说不可接受,那么你需要按照其他答案中所述采取行动。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接