一个线程等待多个线程事件

3

我有一个线程在等待来自多个其他线程的某些事件,这些线程依赖于来自该线程的事件才能继续处理。如何编写代码以避免忙等待?

更具体地说,我有多个线程,它们分别处理视频帧并将其写入屏幕缓冲区(不同位置)。当它们全部完成后,一个线程发送缓冲区以进行绘制,然后这些线程等待绘制调用完成,然后再继续处理它们的下一帧。

这需要用C++实现。使用的C++标准没有限制。

编辑:std::condition_variable似乎不是解决方案。因为它需要互斥量,而在运行时无法知道线程(互斥量)的数量,也不能将互斥量放入std::vector中。

我的程序伪代码如下:

drawThread:
    loop:
        for each processThread:
            wait for event
        draw()

processThread:
    loop:
        process()
        notify drawThread
        wait for draw to finish

编辑:我尝试过:

std::vector<std::atomic_bool> ready_flags;
std::atomic_bool finished_drawing = false;
void drawThread()
{
    while(true)
    { 
        CHECK:
        for(auto& flag: ready_flags)
        {
            if(!flag)
                goto CHECK;
        }
        draw();
        finished_drawing = true;
    }
}
void ProcessThread(int id)
{
    while(true)
    {
        process();
        finished_drawing=false;
        ready_flags[id] = true;
        while(!finished_drawing)
            ;
    }
}

3
std::condition_variable是C++11中的一种同步原语,它用于线程之间的互斥同步。当一个线程等待某个条件为真时,它可以调用std::condition_variable::wait()方法进入阻塞状态,直到另一个线程在符合条件的情况下调用了std::condition_variable::notify_one()或std::condition_variable::notify_all()方法来通知该线程。std::condition_variable还有其他常用的方法,例如std::condition_variable::wait_for()和std::condition_variable::wait_until(),它们允许线程等待一段时间,而不是无限期地等待。使用std::condition_variable需要与std::unique_lockstd::mutex结合使用,以确保正确的线程同步。 - Jeffrey
1
条件变量只需要一个互斥锁。没有规定每个线程必须始终使用自己的独占互斥锁。这样做实际上会破坏互斥锁的目的,因为它们明确设计成可被多个线程访问。毕竟,这就是互斥锁的全部意义所在。 - Sam Varshavchik
1
互斥锁可以防止任意数量的线程并发访问数据。话虽如此,你已经选择了工具(线程数、通信方法),现在你遇到了问题。然而,也许你应该更好地描述目标,参见“XY 问题”。 - Ulrich Eckhardt
你说你不能使用互斥锁的向量。实际上,你可以,只需要通过适当的reserve避免重新分配。或者,如果您事先不知道线程/互斥锁的数量,可以改用互斥锁的std::dequestd::list。问题可能是,您无法同时等待多个条件变量。但是这个问题是可以解决的,例如,通过引入一些(无锁)事件队列并伴随一个单一的条件变量。 - Daniel Langr
@DanielLangr,您能否根据我的情况详细阐述一下?即使只有伪代码也可以。 - sz ppeter
@szppeter 抱歉,我无法在空闲时间内帮忙。但是我可以建议阅读一些关于C++线程的好资料。例如,《C++ Concurrency in Action》是一本相当不错的书。 - Daniel Langr
1个回答

0

我正在努力找到一个可行的解决方案

const int processThreadCount;
std::atomic<int> ready_count{};
std::shared_mutex drawing_mutex;
std::conditional_variable_any can_draw;
void drawThread()
{
    while(true)
    { 
        std::unique_lock lk{drawing_mutex};
        canDraw.wait(lk, [&]{return ready_count==processThreadCount;};
        draw();
        ready_count=0;
    }
}
void ProcessThread(int id)
{
    while(true)
    {
        std::shared_lock lk{drawing_mutex};
        process();
        ++ready_count;
        can_draw.notify_one();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接