两个线程之间同步队列

4

这是一个简单的程序,它有一个名为 start() 的函数,该函数通过使用无限循环等待用户输入并将其存储在队列中。start() 在单独的线程中运行。用户输入某个值后,在主程序中队列的大小仍为零。如何同步队列?
代码:source.cpp

#include <iostream>
#include "kl.h"

using namespace std;

int main()
{
    std::thread t1(start);
    while (1)
    {
        if (q.size() > 0)
        {
            std::cout << "never gets inside this if\n";
            std::string first = q.front();
            q.pop();
        }           
    }
    t1.join();
}

code: kl.h

#include <queue>
#include <iostream> 
#include <string>

void start();
static std::queue<std::string> q;

代码:kl.cpp

#include "kl.h"
using namespace std;

void start()
{
    char i;
    string str;
    while (1)
    {
        for (i = 0; i <= 1000; i++)
        {
            //other stuff and str input
            q.push(str);
        }

    }
}

添加了完整的代码 - user6275035
1
因为您在头文件中对队列使用了“静态”,所以实际上有两个不同的队列,分别位于每个cpp文件中。这就是为什么主函数中的队列始终为空的原因。 - tony
@tony 移除静态后显示链接器错误。 - user6275035
是的,我怀疑它会这样做。尝试使用extern。然后在main.cpp中使用std::queue<std::string> q;。头文件说“这个东西存在于某个地方”,cpp文件说“它在这里”。更好的方法是,在main函数中将其设为静态变量,从头文件中删除它,并通过引用传递到线程函数中。 - tony
@tony 谢谢!这个也有效。 - user6275035
Boost在https://www.boost.org/doc/libs/1_76_0/doc/html/thread/sds.html#thread.sds.synchronized_queues上提供了一个同步队列。此外,Boost还提供了一个无锁队列。 - fuzzyTew
3个回答

2
您的代码包含一个竞争条件 - 它导致了崩溃;两个线程都有可能修改共享队列。(另外,使用char i来循环处理值最好不要超过1000。)
您应该使用std::mutex保护共享队列,并使用std::condition_variable通知有原因检查队列。
具体来说,您应该考虑以下内容(这非常适用于生产者消费者的情况):
  1. 仅在持有互斥锁时访问队列。

  2. 使用条件变量通知已将某些内容推入其中。

  3. 使用条件变量指定何时继续处理的条件。

这是您代码的重写版:
#include <iostream>
#include <queue>
#include <thread>
#include <condition_variable>
#include <mutex>

using namespace std;

std::queue<std::string> q;
std::mutex m;
std::condition_variable cv;

void start()
{
    string str;
    for (std::size_t i = 0; i <= 1000; i++) {
        //other stuff and str input
        std::cout << "here" << std::endl;
        std::unique_lock<std::mutex> lk(m);
        q.push(str);
        lk.unlock();
        cv.notify_one();
    }
}

int main()
{
    std::thread t1(start);
    for (std::size_t i = 0; i <= 1000; i++)
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return !q.empty();});
        std::string first = q.front();
        q.pop();    
    }
    t1.join();
}

@ami-travory,您能详细说明一下这行代码的作用和语法吗? cv.wait(lk, []{return !q.empty();}); - Paul

0
/*  Here I have a code snippate with Separate class for 
    Producing and Consuming along with buffer class */


#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <deque>
#include <vector>
using namespace std;
mutex _mutex_1,_mutex_2;
condition_variable cv;

template <typename T>
class Queue
{
    deque<T> _buffer;
    const unsigned int max_size = 10;
public:
    Queue() = default;
    void push(const T& item)
    {
        while(1)
        {
            unique_lock<mutex> locker(_mutex_1);
            cv.wait(locker,[this](){ return _buffer.size() < max_size; });
            _buffer.push_back(item);
            locker.unlock();
            cv.notify_all();
            return;
        }
    }

    T pop()
    {
        while(1)
        {
            unique_lock<mutex> locker(_mutex_1);
            cv.wait(locker,[this](){ return _buffer.size() > 0; });
            int back = _buffer.back();
            _buffer.pop_back();
            locker.unlock();
            cv.notify_all();
            return back;
        }
    }
};



class Producer
{
    Queue<int>* _buffer;

public:
    Producer(Queue<int>* _buf)
    {
        this->_buffer = _buf;
    }
    void run()
    {
        while(1)
        {
            auto num = rand()%100;
            _buffer->push(num);
            _mutex_2.lock();
            cout<<"Produced:"<<num<<endl;
            this_thread::sleep_for(std::chrono::milliseconds(50));
            _mutex_2.unlock();
        }
    }
};


class Consumer
{
    Queue<int>* _buffer;

public:
    Consumer(Queue<int>* _buf)
    {
        this->_buffer = _buf;
    }
    void run()
    {
        while(1)
        {
            auto num = _buffer->pop();
            _mutex_2.lock();
            cout<<"Consumed:"<<num<<endl;
            this_thread::sleep_for(chrono::milliseconds(50));
            _mutex_2.unlock();
        }
    }
};



void client()
{

    Queue<int> b;
    Producer p(&b);
    Consumer c(&b);
    thread producer_thread(&Producer::run, &p);
    thread consumer_thread(&Consumer::run, &c);
    producer_thread.join();
    consumer_thread.join();
}


int main()
{
    client();

    return 0;
}

0

我的同步队列类示例及其用法:

template<typename T>
class SyncQueue
{
    std::queue<T> m_Que;
    std::mutex m_Lock;
    std::condition_variable m_ConVar;

public:
    void enque(T item)
    {
        std::unique_lock<std::mutex> lock(m_Lock);
        m_Que.push(item);
        lock.unlock();
        m_ConVar.notify_all();
    }

    T deque()
    {
        std::unique_lock<std::mutex> lock(m_Lock);

        do
        {
            m_ConVar.wait(lock);

        } while(m_Que.size() == 0); // extra check from spontaneous notifications

        auto ret = m_Que.front();
        m_Que.pop();

        return ret;
    }
};

int main()
{
    using namespace std::chrono_literals;

    SyncQueue<int> sq;

    std::thread consumer([&sq]()
    {
        std::cout << "consumer" << std::endl;

        for(;;)
        {
            std::cout << sq.deque() << std::endl;
        }
    });

    std::thread provider([&sq]()
    {
        std::this_thread::sleep_for(1s);
        sq.enque(1);
        std::this_thread::sleep_for(3s);
        sq.enque(2);
        std::this_thread::sleep_for(5s);
        sq.enque(3);
    });

    consumer.join();

    return 0;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接