在C/C++中实现工作窃取队列?

36

我正在寻找一个适当的C/C++中工作窃取队列的实现。我在Google上搜索过,但没有找到有用的内容。

也许有人熟悉一个好的开源实现?(我不想实现来自原始学术论文的伪代码)。

10个回答

17

天下没有免费的午餐。

请查看原始的工作窃取论文。这篇论文很难理解。我知道这篇论文包含了理论证明而不是伪代码。然而,没有比TBB更简单的版本。即使有,它也不会给出最佳性能。工作窃取本身就会带来一定的开销,因此优化和技巧非常重要。尤其是,双端队列必须是线程安全的。实现高度可扩展且低开销的同步机制是具有挑战性的。

我真的很好奇你为什么需要它。我认为“适当的实现”指的是像TBB和Cilk这样的东西。再次强调,工作窃取很难实现。


2
这个库 https://github.com/cpp-taskflow/cpp-taskflow 自2018年12月起支持工作窃取。 - Sergey K.

16
实现“工作偷窃”在理论上并不难。您需要一组包含任务的队列,这些任务通过计算和生成其他任务来完成工作。您需要原子访问队列,以将新生成的任务放入这些队列中。最后,您需要一个每个任务在结束时调用的过程,以为执行任务的线程寻找更多工作;该过程需要在工作队列中查找工作。
大多数这样的工作偷窃系统假设有少量线程(通常由真实处理器核支持),每个线程都有一个工作队列。然后您首先尝试从自己的队列中偷取工作,如果它为空,则尝试从其他队列中偷取工作。棘手的是知道要查找哪些队列;串行扫描它们以查找工作非常昂贵,并且可能会在查找工作的线程之间创建大量争用。
到目前为止,这都是相当通用的内容,但有两个主要例外:1)切换上下文(例如,设置处理器上下文寄存器,如“堆栈”)不能以纯C或C ++表示。您可以通过同意使用目标平台特定的机器代码编写包的一部分来解决这个问题。2)对于多处理器的队列的原子访问不能仅使用C或C ++完成(忽略Dekker算法),因此您需要使用汇编语言同步原语(如X86 LOCK XCH或Compare and Swap)对其进行编码。现在,一旦您安全访问了队列更新涉及的代码并不是非常复杂,您可以轻松地在几行C中编写它。
然而,我认为您会发现尝试在C和C ++中使用混合汇编程序编写此类包仍然相当低效,最终您将以汇编语言编写整个程序包。剩下的只有C / C ++兼容的入口点:-}

我曾为我们的PARLANSE并行编程语言做过这种工作。该语言提供了任意数量的并行计算,这些计算可以在任何时刻实时交互(同步)。它在X86上实现,每个CPU有一个线程,并且整个实现都是用汇编语言完成的。其中工作窃取代码可能总共只有1000行,而且是非常棘手的代码,因为你希望在非争用情况下它非常快速。

C和C++最大的问题是,在创建表示工作的任务时,分配多少堆栈空间?串行C/C++程序通过简单地过度分配一个巨大的(例如10MB)线性堆栈来避免这个问题,并且没有人会太关心这些堆栈空间中有多少被浪费了。但是,如果您可以创建数千个任务,并且它们都可以在特定的瞬间活动,您就不可能合理地为每个任务分配10MB。因此,现在你要么需要静态确定一个任务将需要多少堆栈空间(图灵难题),要么你需要分配堆栈块(例如每个函数调用),这是目前广泛可用的C/C++编译器无法实现的(例如您可能正在使用的那个编译器)。最后一个方法是限制任务创建,将其限制在任何瞬间只有几百个,并在活动的任务中多路复用几百个非常大的堆栈。如果任务可以相互锁定/挂起状态,则无法执行此操作,因为您将超过阈值。因此,只有当任务执行计算时才能执行此操作。这似乎是一个非常严格的约束条件。

对于PARLANSE,我们构建了一个编译器,为每个函数调用在堆上分配激活记录。


1
或者你可以做一个明智的选择,直到任务真正运行之前不为其分配空间,并且不要把任务视为暂停和恢复的东西,而是从执行到完成进行运行。 - Phil Miller
2
你的解决方案不太合理。如果构建复杂的系统,当一个工作单元可以随意调用其他任意工作单元时,就无法保证你的任务不需要挂起。你当然可以强制这个属性成立;但是在构建复杂系统时会遇到困难。我们在PARLANSE中构建了百万行的并行程序。 - Ira Baxter
Linux在处理有10,000个线程的进程时表现如何?Windows在每个进程中的线程数达到约15,000个时会崩溃。http://blogs.technet.com/b/markrussinovich/archive/2009/07/08/3261309.aspx。我想要有成千上万个需要等待事件的“线程”。PARLANSE可以做到这一点。我认为Linux或Windows操作系统没有配置好以处理成千上万个线程。我预计会出现各种资源问题,包括仅管理线程句柄。 - Ira Baxter
2
@PSkocik:您的意思是“如果我是CPU k,我应该在哪个队列1..N中寻找可用工作?”糟糕的方法是,如果k的队列为空,就扫描所有其他队列。对于4个队列来说,这可能还可以,但对于32-64个队列来说,这并不理想。一种更好的方法是增加一些开销,使用单个字中的位向量来跟踪哪些队列有工作;它可以通过OR和AND进行廉价更新。... - Ira Baxter
2
如果您锁定操作,可以使位向量准确,但这会使更新变得昂贵,破坏了其目的。因此,我选择不同步执行,这意味着它仅供参考。尽管如此,这是一个非常好的提示,可以让您首先查找到正确的位置。 - Ira Baxter
显示剩余3条评论

3
这个开源库https://github.com/cpp-taskflow/cpp-taskflow从2018年12月份开始支持工作窃取线程池。
请看WorkStealingQueue类,它实现了论文“Dynamic Circular Work-stealing Deque”,SPAA,2015中描述的工作窃取队列。

2

如果你正在寻找一个基于pthread或boost::thread的C++独立工作窃取队列类实现,那么恭喜你,据我所知并没有这样的实现。

然而,正如其他人所说,Cilk、TBB和Microsoft的PPL都在其底层实现了工作窃取算法。

问题是,你是想使用一个工作窃取队列还是要自己实现一个?如果只是想使用,则上述选择都是不错的起点,简单地在其中任何一个中安排一个“任务”就足够了。

正如BlueRaja所说,在PPL中的task_group和structured_task_group也可以做到这一点,另外请注意最新版本的Intel TBB中也提供了这些类。并行循环(parallel_for、parallel_for_each)也是使用工作窃取实现的。

如果你必须查看源代码而不是使用实现,TBB是开源的,Microsoft也为其CRT提供了源代码,因此你可以去挖掘。

你还可以在Joe Duffy的博客上查找C#实现(但它是C#,内存模型不同)。

-Rick


2
存在一种工具,可以以非常优雅的方式简化操作。这是一种非常有效的方法,可以在很短的时间内并行化您的程序。 Cilk项目

HPC Challenge奖

我们的Cilk参赛作品获得了HPC Challenge类别2奖项,荣获2006年“最佳优雅和性能组合”奖项。该奖项于2006年11月14日在Tampa的SC'06上颁发。


1

1

structured_task_group类别在PPL中使用了工作窃取队列来实现。如果你需要一个用于线程的WSQ,我会推荐这个。
如果你真的在寻找源代码,我不知道代码是否在ppl.h中给出或者是否有预编译对象;我今晚回家后会检查一下。


1
OpenMP可能很好地支持工作窃取,尽管它被称为递归并行性。 OpenMP论坛帖子 引用:
OpenMP规范定义了任务构造(可以嵌套,因此非常适合递归并行),但未指定如何实现它们的详细信息。 OpenMP实现(包括gcc)通常使用某种形式的工作窃取来处理任务,尽管确切的算法(以及由此产生的性能)可能会有所不同!
请参见#pragma omp task和#pragma omp taskwait 更新 C++ Concurrency in Action一书的第9章描述了如何为池线程实现“工作窃取”。我自己没有阅读/实施过,但它看起来并不太困难。

0

我已经将这个C项目移植到了C++。

原始的Steal在数组扩展时可能会出现脏读。我试图修复这个错误,但最终放弃了,因为我实际上并不需要一个动态增长的堆栈。而是在尝试分配空间的代替方法中,Push方法只需返回false。调用者可以执行自旋等待,即while(!stack->Push(value)){}

#pragma once
#include <atomic>

  // A lock-free stack.
  // Push = single producer
  // Pop = single consumer (same thread as push)
  // Steal = multiple consumer

  // All methods, including Push, may fail. Re-issue the request
  // if that occurs (spinwait).

  template<class T, size_t capacity = 131072>
  class WorkStealingStack {

  public:
    inline WorkStealingStack() {
      _top = 1;
      _bottom = 1;
    }

    WorkStealingStack(const WorkStealingStack&) = delete;

    inline ~WorkStealingStack()
    {

    }

    // Single producer
    inline bool Push(const T& item) {
      auto oldtop = _top.load(std::memory_order_relaxed);
      auto oldbottom = _bottom.load(std::memory_order_relaxed);
      auto numtasks = oldbottom - oldtop;

      if (
        oldbottom > oldtop && // size_t is unsigned, validate the result is positive
        numtasks >= capacity - 1) {
        // The caller can decide what to do, they will probably spinwait.
        return false;
      }

      _values[oldbottom % capacity].store(item, std::memory_order_relaxed);
      _bottom.fetch_add(1, std::memory_order_release);
      return true;
    }

    // Single consumer
    inline bool Pop(T& result) {

      size_t oldtop, oldbottom, newtop, newbottom, ot;

      oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
      ot = oldtop = _top.load(std::memory_order_acquire);
      newtop = oldtop + 1;
      newbottom = oldbottom - 1;

      // Bottom has wrapped around.
      if (oldbottom < oldtop) {
        _bottom.store(oldtop, std::memory_order_relaxed);
        return false;
      }

      // The queue is empty.
      if (oldbottom == oldtop) {
        _bottom.fetch_add(1, std::memory_order_release);
        return false;
      }

      // Make sure that we are not contending for the item.
      if (newbottom == oldtop) {
        auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
        if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
          _bottom.fetch_add(1, std::memory_order_release);
          return false;
        }
        else {
          result = ret;
          _bottom.store(newtop, std::memory_order_release);
          return true;
        }
      }

      // It's uncontended.
      result = _values[newbottom % capacity].load(std::memory_order_acquire);
      return true;
    }

    // Multiple consumer.
    inline bool Steal(T& result) {
      size_t oldtop, newtop, oldbottom;

      oldtop = _top.load(std::memory_order_acquire);
      oldbottom = _bottom.load(std::memory_order_relaxed);
      newtop = oldtop + 1;

      if (oldbottom <= oldtop)
        return false;

      // Make sure that we are not contending for the item.
      if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
        return false;
      }

      result = _values[oldtop % capacity].load(std::memory_order_relaxed);
      return true;
    }

  private:

    // Circular array
    std::atomic<T> _values[capacity];
    std::atomic<size_t> _top; // queue
    std::atomic<size_t> _bottom; // stack
  };

完整代码片段(包括单元测试)。 我只在强架构(x86/64)上运行了测试,所以如果您尝试在例如Neon/PPC上使用此代码,则可能会有所不同。


-1

我不认为JobSwarm使用工作窃取技术,但这是第一步。我不知道其他用于此目的的开源库。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接