如何在C++类中实现线程安全的互斥保护?

3

我正在尝试在C++中实现一个生产者/消费者模型的多线程程序,与我的项目相关。基本思路是主线程创建第二个线程来监视串口数据,处理数据并将结果放入定期轮询的缓冲区中。我以前从未编写过多线程程序。我一直在阅读许多教程,但它们都是用C语言编写的。我认为我已经掌握了基本概念,但我正在尝试将其转化为C++。对于缓冲区,我想创建一个带有互斥保护的数据类。这是我想出的方法。

1)我是否采取了错误的方法?是否有更聪明的方法来实现受保护的数据类?

2)如果两个线程同时尝试调用ProtectedBuffer::add_back(),以下代码会发生什么?

#include <deque>
#include "pthread.h"

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  pthread_mutex_t mutex;
public:
  void add_back(T data) {
    pthread_mutex_lock(&mutex);
    buffer.push_back(data);
    pthread_mutex_unlock(&mutex);
  }
  void get_front(T &data) {
    pthread_mutex_lock(&mutex);
    data = buffer.front();
    buffer.pop_front();
    pthread_mutex_unlock(&mutex);
  }
};

编辑:感谢大家提供的建议。我已经尝试在下面实现它们。我还添加了一些错误检查,所以如果一个线程不知何故地尝试两次锁定同一个互斥量,它将优雅地失败。我想。

#include "pthread.h"
#include <deque>


class Lock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit Lock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_lock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~Lock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

class TryToLock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit TryToLock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_trylock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~TryToLock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

template <class T>
class ProtectedBuffer{
    pthread_mutex_t mutex;
    pthread_mutexattr_t mattr;
    std::deque<T> buffer;
    bool failbit;

    ProtectedBuffer(const ProtectedBuffer& x);
    ProtectedBuffer& operator= (const ProtectedBuffer& x);
public:
    ProtectedBuffer() {
        pthread_mutexattr_init(&mattr);
        pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
        pthread_mutex_init(&mutex, &mattr);
        failbit = false;
    }
    ~ProtectedBuffer() {
        pthread_mutex_destroy(&mutex);
        pthread_mutexattr_destroy(&mattr);
    }
    void add_back(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
    void get_front(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_get_front(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_add_back(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
};

http://www.thegeekstuff.com/2012/07/c-thread-safe-and-reentrant/ - PersianGulf
4个回答

9

有几点需要注意:

  • You need to initialize mutex with pthread_mutex_init in the constructor and free it with pthread_mutex_destroy in the destructor.

  • You must make your class non-copyable and non-assignable (or otherwise implement copy constructor and assignment operator correctly; see above).

  • It's worthwhile making a SBRM helper class for the lock:

    class Lock
    {
        pthread_mutex_t & m;
    public:
        explicit Lock(pthread_mutex_t & _m) : m(_m) { pthread_mutex_lock(&m); }
        ~Lock() { pthread_mutex_unlock(&m); }
    };
    

    Now you can make a synchronized scope like { Lock lk(mutex); /* ... */ }.

关于问题2:并发访问通过锁定互斥量进行序列化。其中一个竞争线程将在获得互斥锁时休眠。

我认为我理解了辅助类。有几个问题。第一,pthread_mutex_t & m;中的&是什么意思?第二,如果我想要一个非阻塞的Lock类,我只需要将pthread_mutex_lock(&m)替换为pthread_mutex_try_lock(&m),并让我的辅助类在返回“忙碌”时抛出异常吗? - Beezum
@Beezum:一)那是一个引用;二)try-lock与作用域资源管理习惯不太搭配;你需要创建一个更复杂的类,可以在未锁定的情况下构建,然后以某种方式公开try-lock调用并返回bool。使用try-lock的代码更加复杂。 - Kerrek SB
顺便问一下,SBRM是什么? - Dhanaraj Durairaj
@DhanarajDurairaj: “基于范围的资源管理”,即使用析构函数和自动变量来管理资源。有时称为“RAII”,但这并不是一个很好的术语。 - Kerrek SB

1
我这样做是错的吗?有没有更聪明的方法来实现受保护的数据类?
对于您目前的实现,我认为您已经有了一个良好的开端。既然您提到了C++化,那么如果您有一个支持C++11的编译器,您可以使用新的线程支持。
您提到您希望主线程轮询此缓冲区,但我没有看到任何允许它这样做的机制。要么get_front在缓冲区中没有内容时应该提供错误,要么get_buffer应该阻塞调用者直到有可用数据。
#include <deque>
#include <mutex>
#include <condition_variable>
#include <stdexcept>

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  std::mutex mtx;
  std::condition_variable empty_cnd;
  void get_front_i(T &data) {
    data = buffer.front();
    buffer.pop_front();
  }
public:
  void add_back(T data) {
    std::lock_guard<std::mutex> g(mtx);
    bool was_empty = buffer.empty();
    buffer.push_back(data);
    if (was_empty) empty_cnd.notify_one();
  }
  void get_front_check(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    if (buffer.empty()) throw std::underflow_error("no data");
    get_front_i(data);
  }
  void get_front_block(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    std::unique_lock<std::mutex> u(mtx);
    while (buffer.empty()) empty_cnd.wait(u);
    get_front_i(data);
    if (!buffer.empty()) empty_cnd.notify_one();
  }
};

如果您想限制添加到缓冲区的数据量,可以添加类似的full_cnd条件变量来检查是否满足满条件,如果为真,则add_back调用将等待。然后,当缓冲区不再满时,get_front_i方法可以发出信号。

如果两个线程同时尝试调用ProtectedBuffer :: add_back()会发生什么?

由于add_back受到互斥保护,因此如果两个线程同时调用它,则一个线程将被阻止调用push_back,直到另一个线程完成。


0
你已经掌握了基础知识,但我建议进一步将互斥锁本身包装在自己的RAII包装器中,例如:
#include <deque> 
#include "pthread.h" 

class ProtectedMutex
{
  pthread_mutex_t &mutex; 
public:
  ProtectedMutex(pthread_mutex_t &m)
    : mutex(m); 
  {
    pthread_mutex_lock(&mutex); 
  }
  ~ProtectedMutex()
  {
    pthread_mutex_unlock(&mutex); 
  }
};

template <class T> 
class ProtectedBuffer { 
  std::deque<T> buffer; 
  pthread_mutex_t mutex; 
public: 
  void add_back(T data) { 
    ProtectedMutex m(mutex); 
    buffer.push_back(data); 
  } 
  void get_front(T &data) { 
    ProtectedMutex m(mutex); 
    data = buffer.front(); 
    buffer.pop_front(); 
  } 
}; 

0
“将结果放入定期轮询的主线程缓冲区中” - CPU浪费和延迟。
“我是不是走错了路?”- 是的。我不知道你的系统对于辅助线程<> GUI线程通信有什么支持,但总有PostMessage() API。
你需要一个Buffer类,当然,它需要一个用于串行rx数据的数据成员和一个执行协议/“处理数据”的方法。你不需要太多其他东西。在第二个线程中,创建一个buffer类实例。加载它,处理数据并将其指针PostMessage/dispatch/BeginInvoke到GUI线程。在串行线程的下一行代码中,使用相同的实例指针变量为串行端口的下一次数据加载创建另一个实例。在GUI显示/日志记录/任何操作之后,GUI线程应该删除接收到的*Buffer。
没有延迟,没有CPU浪费,没有数据复制,没有串行线程和GUI线程在同一个缓冲区实例上工作的机会,没有令人讨厌的、复杂的缓冲区共享代码,没有锁,没有麻烦。它只会很好地工作。
其他任何东西都会很混乱。
编辑-忘记了(2)-不知道,不会碰它。

1
在Windows之外,没有PostMessage API可用。由于他的问题涉及到pthread_mutex_t,因此他可能没有使用它... 在类Unix系统中实现此功能的方法是使用poll或其替代品(epollkqueue等)。 - asveikau
我应该提到这将进入一个嵌入式Linux系统。我查阅了@asveikau的建议,了解了epoll。但是在这种情况下,我有点困惑如何使用它。我是否需要在文件系统中创建一个新文件来向主线程发出信号?这比使用条件变量好吗?我曾考虑过条件变量,但我不希望我的主线程在等待条件变量时被阻塞。它还有其他事情要处理,而串行接口正在工作。 - Beezum
@Beezum - 关键的洞察力在于,最终你的串口是一个文件(并且有一个可以轮询的文件描述符)。我不确定你的主线程做了什么,但如果它正在使用X UI或终端界面,则可能还会阻塞在来自文件描述符的事件上...调用pollepoll的循环可以使您的进程进入睡眠状态,直到它从这些源之一接收到事件。 - asveikau

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接