用最优雅的方式同步C++线程

6
我试图解决以下问题,我知道有多种解决方案,但我正在寻找解决它的最优雅的方式(更少的代码)。
我有4个线程,其中3个线程在无限循环中尝试向易失性整数变量写入唯一值(0、1或2),第四个线程尝试读取此变量的值,并在无限循环中将该值打印到stdout。
我想要同步线程,使写入0的线程先运行,然后是“print”线程,然后是写入1的线程,再次是print线程,以此类推…… 因此,最终我希望在“print”线程的输出中看到一系列的零,然后是一系列的1,然后是2,然后是0,依此类推……
这是程序代码:
volatile int value;
int thid[4];

int main() {
    HANDLE handle[4];
    for (int ii=0;ii<4;ii++) {
        thid[ii]=ii;
        handle[ii] = (HANDLE) CreateThread( NULL, 0, (LPTHREAD_START_ROUTINE)                 ThreadProc, &thid[ii], 0, NULL);
    }
    return 0;
}

void WINAPI ThreadProc( LPVOID param ) {
    int h=*((int*)param);

    switch (h) {
        case 3:
            while(true) {
                cout << value << endl;
            }
            break;
        default:
            while(true) {
                // setting a unique value to the volatile variable
                value=h;
            }
            break;
    }
}

3
为什么不使用C++11和atomic - Walter
1
为了满足您的排序需求,您应该使用C++11的std::condition_variable和std::mutex。 - Wandering Logic
单独使用原子操作是没有用的,你需要一些东西来阻塞线程0并等待线程3处理数据。这需要某种互斥锁模式。 - Mats Petersson
如果你想让这三个“线程”的代码串行运行,你不需要使用线程。只需编写函数并调用它们即可。 - Pete Becker
你的问题更多是“交接”的问题——可以称为生产者/消费者类型的工作负载。你要确保执行 'print' 的线程(消费者)和执行 'inc' 的线程(生产者)以交替的方式工作。在这里,你需要实现一个队列机制或状态机。对于这样的“锁步”实现(生产者/消费者之间的严格交替),最好不要使用超过两个线程。你确定你的问题描述在这个意义上是“准确的”,而且“乒乓”确实是你想要的吗? - FrankH.
是的,如果您将问题视为生产者/消费者类型,则有3个生产者线程更改值和一个消费者线程打印值。实际上,我需要控制调度,使无限while循环的执行顺序为:线程0、线程3、线程1、线程3、线程2、线程3、线程0、线程3等等... 我正在寻找解决此问题的最简单方法... - Hawk89
2个回答

5

您的问题可以通过生产者消费者模式来解决。我受到维基百科的启发,如果您需要更多详细信息,请查看以下链接。

https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem

我使用了随机数生成器来生成易变变量,但你可以更改这一部分。

以下是代码:它在风格方面可以进行改进(使用C++11生成随机数),但它会产生你期望的结果。

#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
#include <stdlib.h>     /* srand, rand */
using namespace std;

//random number generation
std::mutex mutRand;//mutex for random number generation (given that the random generator is not thread safe).
int GenerateNumber()
{
    std::lock_guard<std::mutex> lk(mutRand);
    return rand() % 3;
}

// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }

//      Constants
//
const int num_producers = 3;                //the three producers of random numbers
const int num_consumers = 1;                //the only consumer
const int producer_delay_to_produce = 10;   // in miliseconds
const int consumer_delay_to_consume = 30;   // in miliseconds

const int consumer_max_wait_time = 200;     // in miliseconds - max time that a consumer can wait for a product to be produced.

const int max_production = 1;              // When producers has produced this quantity they will stop to produce
const int max_products = 1;                // Maximum number of products that can be stored

//
//      Variables
//
atomic<int> num_producers_working(0);       // When there's no producer working the consumers will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry

condition_variable is_not_full;             // to indicate that our stack is not full between the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between the thread operations

//
//      Functions
//

//      Produce function, producer_id will produce a product
void produce(int producer_id)
{
    while (true)
    {
        unique_lock<mutex> lock(xmutex);
        int product;

        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = GenerateNumber();
        products.push(product);

        print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
        is_not_empty.notify_all();
    }

}

//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
    while (true)
    {
        unique_lock<mutex> lock(xmutex);
        int product;

        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size() > 0; }))
        {
                product = products.top();
                products.pop();

                print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
                is_not_full.notify_all();
        }
    }

}

//      Producer function, this is the body of a producer thread
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i < max_production; ++i)
        {
                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }

        print(stringstream() << "Producer " << id << " has exited\n");
        --num_producers_working;
}

//      Consumer function, this is the body of a consumer thread
void consumer(int id)
{
        // Wait until there is any producer working
        while(num_producers_working == 0) this_thread::yield();

        while(num_producers_working != 0 || products.size() > 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }

        print(stringstream() << "Consumer " << id << " has exited\n");
}

//
//      Main
//

int main()
{
        vector<thread> producers_and_consumers;

        // Create producers
        for(int i = 0; i < num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));

        // Create consumers
        for(int i = 0; i < num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));

        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
        return 0;
}

希望对你有所帮助,如果需要更多信息或者有不同意见,请告诉我 :-)
祝所有法国人国庆日快乐!

2
如果您想要“同步”线程,则可以使用同步对象将每个线程保持在“乒乓”或“滴答”模式中。在C ++ 11中,您可以使用条件变量,这个例子展示了类似于您所要求的内容。

这不完全是那种问题,因为在这里你需要控制调度并将其设置为以下顺序:线程0、线程3、线程1、线程3、线程2、线程3、线程0、线程3等等... - Hawk89
是的,所以它并不完全相同,但基本原理是相同的。 - Mats Petersson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接