C++11中有并发容器吗?

64

特别是,我正在寻找一个阻塞队列。C++11中有这样的东西吗?如果没有,我的其他选择是什么?我真的不想再自己降到线程级别了,太容易出错了。


3
在C++0x时代,Scott Meyers提出了这个问题链接。了解在C++11之后这个问题的答案会很有趣。 - Alok Save
非常容易使用基元将标准队列转换为阻塞队列。 - David Heffernan
6个回答

43

据微软Visual C++团队的Diego Dagum所说

一个经常被问到(其实有很多)的问题是关于STL容器是否线程安全。

引用Stephan的话来说,事实上它们并不是线程安全的问题,而是特性:让每个STL容器的成员函数都获取内部锁将会破坏性能。作为一个通用的、高度可重用的库,它实际上也不能提供正确性:放置锁的正确级别取决于程序正在做什么。在这个意义上,单个成员函数往往不是正确级别。

Parallel Patterns Library (PPL) 包含多个容器,可以对其元素进行线程安全的访问:

  • concurrent_vector类 是一个序列容器类,允许随机访问任何元素。它支持并发安全的追加、元素访问、迭代器访问和迭代器遍历等操作。
  • concurrent_queue类 是一个序列容器类,允许先进先出地访问其元素。它支持一些有限的并发安全操作,例如push和try_pop等操作。

这里有一些示例

还有一个有趣的链接:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html


3
是的,但问题在于它只适用于Windows =( - Andrey Suvorov
我认为concurrent_queue类不是严格的FIFO: “在这里,并发安全意味着指针或迭代器始终有效。这并不保证元素初始化或特定遍历顺序。” - Joel

15

C++11本身不提供并发容器,但有库选项可供选择。除了已经提到的PPL之外,不要忘记Intel TBB库。

它具有并发queuehash_mapsetvector实现。但它不仅是一个线程安全的容器库,还带有标准算法的并行版本(for-loop、reduce、sort等)。

Intel TBB网站


1
你能给我并发集的链接吗? - user

10

我很惊讶没有人提到 moodycamel::ConcurrentQueue。我们已经使用它相当长一段时间,它的表现非常出色。它的实现是无锁的,这使得它的速度非常快。使用它的其他原因(引用自官方网站):

C++中没有太多完整的无锁队列。Boost有一个,但它仅限于具有平凡赋值运算符和平凡析构函数的对象,例如。Intel的TBB队列不是无锁队列,也需要平凡的构造函数。有许多学术论文在C++中实现了无锁队列,但可用的源代码很难找到,并且测试更加困难。

一些基准测试和比较可以在这里这里这里找到。

注意:在多个生产者的情况下,弹出元素的顺序不能保证与推入元素的顺序相同(@IgorLevicki),因此如果您需要这种保证,请寻找其他选项。


moodycamel实现的问题在于它不是FIFO(即弹出元素的顺序不能保证与推入元素的顺序相同),因此它不是一种通用解决方案。 - Igor Levicki
@IgorLevicki 是的,你说得对。唯一提供的保证是来自同一生产者的元素将保持它们的相对顺序。 - Miljen Mikic

1

这些容器的接口并没有为此目的而设计。对于它们使用的接口,对客户端可见的锁确实是您可以在保证正确性和可预测行为的同时完成此操作的唯一方法。这也将非常低效,因为获取锁的次数会非常高(相对于良好实现)。

解决方案1

传递值(适用时)。

解决方案2

创建一组简单的附加实现集合,您可以使用它们来传递容器,并持有作用域锁定(将其视为伪C ++):

template <typename TCollection>
class t_locked_collection {
public:
    t_locked_collection(TCollection& inCollection, t_lock& lock) : collection(inCollection), d_lock(lock), d_nocopy() {
    }

    TCollection& collection;
    // your convenience stuff
private:
    t_scope_lock d_lock;
    t_nocopy d_nocopy;
};

然后调用者将锁与集合配对,然后在适当的情况下更新接口以使用(通过)容器类型。这只是一个穷人的类扩展。

这个锁定容器是一个简单的例子,还有一些其他变体。我选择这条路线,因为它真正允许您使用理想的程序粒度级别,即使它不像锁定方法那样透明(语法)。它也相对容易地适应现有程序。至少它的行为是可预测的,不像带有内部锁的集合。

另一种变体可能是:

template <typename TCollection>
class t_lockable_collection {
public:
// ...
private:
    TCollection d_collection;
    t_mutex d_mutex;
};

// example:
typedef t_lockable_collection<std::vector<int> > t_lockable_int_vector;

...在这里,类似于t_locked_collection的类型可以用来暴露底层集合。并不是要暗示这种方法是万无一失的,只是相对抗干扰。


你所说的“按值传递”,是指通过按值传递整个容器来创建一个副本并对其进行操作,还是按值传递容器中的项?请详细说明。 - steffen
@steffen 通过值传递容器的元素。考虑到许多容器采用的接口,这种方法(解决方案1)远非最佳解决方案。此方法在正确性方面几乎没有用处,除非您愿意编写大量异常处理并使用大量共享指针。 - justin

1

我实现了一个并发无序映射表

命名空间为concurrency

{

template<typename T,typename T1>
class unordered_bucket: private std::unordered_map<T,T1>
{
mutable std::recursive_mutex m_mutex;

public:
T1 &operator [](T a)
{
    std::lock_guard<std::recursive_mutex> l(m_mutex);
    return std::unordered_map<T,T1>::operator [](a);
}

size_t size() const noexcept {
    std::lock_guard<std::recursive_mutex> l(m_mutex);
    return  std::unordered_map<T,T1>::size();
}

vector<pair<T,T1>> toVector() const
{
    std::lock_guard<std::recursive_mutex> l(m_mutex);

    vector<pair<T,T1>> ret;
    for(const pair<T,T1> &p:*this)
    {
        ret.push_back(p);
    }
    return ret;
}

bool find(const T &t) const
{
    std::lock_guard<std::recursive_mutex> l(m_mutex);
    if(this->std::unordered_map<T,T1>::find(t) == this->end())
        return false;  //not found
    return true;
}
void erase()
{
    std::lock_guard<std::recursive_mutex> l(m_mutex);
    this->unordered_map<T,T1>::erase(this->begin(),this->end());
}
void erase(const T &t)
{
    std::lock_guard<std::recursive_mutex> l(m_mutex);
    this->unordered_map<T,T1>::erase(t);
}
};

#define BUCKETCOUNT 10
template<typename T,typename T1>
class ConcurrentMap
{
std::vector<unordered_bucket<T,T1>> m_v;
public:
ConcurrentMap():m_v(BUCKETCOUNT){}   //using 10 buckets

T1 &operator [](T a)
{
    std::hash<T> h;
    return m_v[h(a)%BUCKETCOUNT][a];
}

size_t size() const noexcept {
    size_t cnt=0;

    for(const unordered_bucket<T,T1> &ub:m_v)
        cnt=cnt+ub.size();

    return  cnt;
}

vector<pair<T,T1>> toVector() const
{
    vector<pair<T,T1>> ret;
    for(const unordered_bucket<T,T1> &u:m_v)
    {
        const vector<pair<T,T1>> &data=u.toVector();
        ret.insert(ret.end(),data.begin(),data.end());
    }
    return ret;
}

bool find(const T &t) const
{
    for(const unordered_bucket<T,T1> &u:m_v)
        if(true == u.find(t))
            return true;
    return false;
}
void erase()
{
    for(unordered_bucket<T,T1> &u:m_v)
        u.erase();
}
void erase(const T &t)
{
    std::hash<T> h;
    unordered_bucket<T,T1> &ub = m_v[h(t)%BUCKETCOUNT];
    ub.erase(t);
}
};
}

1
在C++11中没有并发容器。
但是以下头文件类提供了使用std::deque的并发队列、栈和优先级容器。 BlockingCollection是一个C++11线程安全的集合类,它是模仿.NET BlockingCollection类设计的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接