我正在寻找一个适当的C/C++中工作窃取队列的实现。我在Google上搜索过,但没有找到有用的内容。
也许有人熟悉一个好的开源实现?(我不想实现来自原始学术论文的伪代码)。
我正在寻找一个适当的C/C++中工作窃取队列的实现。我在Google上搜索过,但没有找到有用的内容。
也许有人熟悉一个好的开源实现?(我不想实现来自原始学术论文的伪代码)。
天下没有免费的午餐。
请查看原始的工作窃取论文。这篇论文很难理解。我知道这篇论文包含了理论证明而不是伪代码。然而,没有比TBB更简单的版本。即使有,它也不会给出最佳性能。工作窃取本身就会带来一定的开销,因此优化和技巧非常重要。尤其是,双端队列必须是线程安全的。实现高度可扩展且低开销的同步机制是具有挑战性的。
我真的很好奇你为什么需要它。我认为“适当的实现”指的是像TBB和Cilk这样的东西。再次强调,工作窃取很难实现。
我曾为我们的PARLANSE并行编程语言做过这种工作。该语言提供了任意数量的并行计算,这些计算可以在任何时刻实时交互(同步)。它在X86上实现,每个CPU有一个线程,并且整个实现都是用汇编语言完成的。其中工作窃取代码可能总共只有1000行,而且是非常棘手的代码,因为你希望在非争用情况下它非常快速。
C和C++最大的问题是,在创建表示工作的任务时,分配多少堆栈空间?串行C/C++程序通过简单地过度分配一个巨大的(例如10MB)线性堆栈来避免这个问题,并且没有人会太关心这些堆栈空间中有多少被浪费了。但是,如果您可以创建数千个任务,并且它们都可以在特定的瞬间活动,您就不可能合理地为每个任务分配10MB。因此,现在你要么需要静态确定一个任务将需要多少堆栈空间(图灵难题),要么你需要分配堆栈块(例如每个函数调用),这是目前广泛可用的C/C++编译器无法实现的(例如您可能正在使用的那个编译器)。最后一个方法是限制任务创建,将其限制在任何瞬间只有几百个,并在活动的任务中多路复用几百个非常大的堆栈。如果任务可以相互锁定/挂起状态,则无法执行此操作,因为您将超过阈值。因此,只有当任务仅执行计算时才能执行此操作。这似乎是一个非常严格的约束条件。
对于PARLANSE,我们构建了一个编译器,为每个函数调用在堆上分配激活记录。
WorkStealingQueue
类,它实现了论文“Dynamic Circular Work-stealing Deque”,SPAA,2015中描述的工作窃取队列。如果你正在寻找一个基于pthread或boost::thread的C++独立工作窃取队列类实现,那么恭喜你,据我所知并没有这样的实现。
然而,正如其他人所说,Cilk、TBB和Microsoft的PPL都在其底层实现了工作窃取算法。
问题是,你是想使用一个工作窃取队列还是要自己实现一个?如果只是想使用,则上述选择都是不错的起点,简单地在其中任何一个中安排一个“任务”就足够了。
正如BlueRaja所说,在PPL中的task_group和structured_task_group也可以做到这一点,另外请注意最新版本的Intel TBB中也提供了这些类。并行循环(parallel_for、parallel_for_each)也是使用工作窃取实现的。
如果你必须查看源代码而不是使用实现,TBB是开源的,Microsoft也为其CRT提供了源代码,因此你可以去挖掘。
你还可以在Joe Duffy的博客上查找C#实现(但它是C#,内存模型不同)。
-Rick
HPC Challenge奖
我们的Cilk参赛作品获得了HPC Challenge类别2奖项,荣获2006年“最佳优雅和性能组合”奖项。该奖项于2006年11月14日在Tampa的SC'06上颁发。
structured_task_group类别在PPL中使用了工作窃取队列来实现。如果你需要一个用于线程的WSQ,我会推荐这个。
如果你真的在寻找源代码,我不知道代码是否在ppl.h中给出或者是否有预编译对象;我今晚回家后会检查一下。
我已经将这个C项目移植到了C++。
原始的Steal
在数组扩展时可能会出现脏读。我试图修复这个错误,但最终放弃了,因为我实际上并不需要一个动态增长的堆栈。而是在尝试分配空间的代替方法中,Push
方法只需返回false
。调用者可以执行自旋等待,即while(!stack->Push(value)){}
。
#pragma once
#include <atomic>
// A lock-free stack.
// Push = single producer
// Pop = single consumer (same thread as push)
// Steal = multiple consumer
// All methods, including Push, may fail. Re-issue the request
// if that occurs (spinwait).
template<class T, size_t capacity = 131072>
class WorkStealingStack {
public:
inline WorkStealingStack() {
_top = 1;
_bottom = 1;
}
WorkStealingStack(const WorkStealingStack&) = delete;
inline ~WorkStealingStack()
{
}
// Single producer
inline bool Push(const T& item) {
auto oldtop = _top.load(std::memory_order_relaxed);
auto oldbottom = _bottom.load(std::memory_order_relaxed);
auto numtasks = oldbottom - oldtop;
if (
oldbottom > oldtop && // size_t is unsigned, validate the result is positive
numtasks >= capacity - 1) {
// The caller can decide what to do, they will probably spinwait.
return false;
}
_values[oldbottom % capacity].store(item, std::memory_order_relaxed);
_bottom.fetch_add(1, std::memory_order_release);
return true;
}
// Single consumer
inline bool Pop(T& result) {
size_t oldtop, oldbottom, newtop, newbottom, ot;
oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
ot = oldtop = _top.load(std::memory_order_acquire);
newtop = oldtop + 1;
newbottom = oldbottom - 1;
// Bottom has wrapped around.
if (oldbottom < oldtop) {
_bottom.store(oldtop, std::memory_order_relaxed);
return false;
}
// The queue is empty.
if (oldbottom == oldtop) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
// Make sure that we are not contending for the item.
if (newbottom == oldtop) {
auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
else {
result = ret;
_bottom.store(newtop, std::memory_order_release);
return true;
}
}
// It's uncontended.
result = _values[newbottom % capacity].load(std::memory_order_acquire);
return true;
}
// Multiple consumer.
inline bool Steal(T& result) {
size_t oldtop, newtop, oldbottom;
oldtop = _top.load(std::memory_order_acquire);
oldbottom = _bottom.load(std::memory_order_relaxed);
newtop = oldtop + 1;
if (oldbottom <= oldtop)
return false;
// Make sure that we are not contending for the item.
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
return false;
}
result = _values[oldtop % capacity].load(std::memory_order_relaxed);
return true;
}
private:
// Circular array
std::atomic<T> _values[capacity];
std::atomic<size_t> _top; // queue
std::atomic<size_t> _bottom; // stack
};
完整代码片段(包括单元测试)。 我只在强架构(x86/64)上运行了测试,所以如果您尝试在例如Neon/PPC上使用此代码,则可能会有所不同。