多线程作业队列管理器

37
我需要在交互式应用程序中管理CPU密集型的多任务作业。具体而言,我的应用程序是一个工程设计界面。当用户调整模型的不同参数和选项时,后台运行多个模拟,并在完成后显示结果,甚至在用户仍在编辑值时也可能如此。由于多个模拟所需的时间各不相同(有些是毫秒级别,有些需要5秒,有些需要10分钟),因此基本上是尽快显示反馈,但通常会中止之前开始的但现在已经无需的作业,因为用户的更改已经使它们无效。不同的用户更改可能会使不同的计算无效,因此任何时候我都可能有10个不同的模拟正在运行。某些模拟有多个部分具有依赖关系(模拟A和B可以分别计算,但我需要它们的结果来启动模拟C,因此我需要先等待A和B都完成后再开始C)。
我对处理这种应用程序的代码级方法感到非常自信,那就是某种多线程作业队列。这将包括提交执行作业、设置任务优先级、等待作业完成、指定依赖关系(执行此作业,但仅在作业X和作业Y完成后才执行)、取消符合某些条件的子集作业、查询剩余作业、设置工作线程计数和优先级等功能。而且,多平台支持也非常有用。
这些在软件中并不是新的想法或需求,但我处于应用程序的早期设计阶段,在这个阶段我需要选择使用哪个库来管理这样的任务。过去我曾在C中编写过自己简陋的线程管理器(我认为这是一种成年礼),但我想使用现代工具来基于我的工作,而不是依赖于我以前的hack。
第一个想法是使用OpenMP,但我不确定它是否是我想要的。OpenMP非常适合在细粒度上并行化,自动展开循环等。虽然是多平台的,但它也会用#pragmas侵入您的代码。但最重要的是,它不适用于管理大型任务,特别是取消挂起的作业或指定依赖项。虽然可能可以实现,但不够优雅。
我注意到Google Chrome甚至对最琐碎的任务都使用这样的作业管理器。设计目标似乎是尽可能使用户交互线程轻巧和灵活,因此任何可以异步生成的东西都应该被异步生成。从查看Chrome源代码来看,这似乎不是通用库,但它仍然很有趣,因为它使用异步启动来保持交互速度。这正在变得类似于我所做的事情。
还有其他一些选择: Surge.Act:类似于Boost的库,用于定义作业。它建立在OpenMP之上,但允许链接依赖项,这很好。它似乎没有管理器可以查询、取消作业等。它是一个过时的项目,所以依赖它是可怕的。 Job Queue非常接近我所考虑的内容,但它是一篇5年前的文章,不是一个受支持的库。

Boost.threads提供了很好的平台无关同步,但它不是一个作业管理器。POCO对于任务启动有非常清晰的设计,但也不是一个完整的任务链管理器。(也许我低估了POCO)。

因此,虽然有可用的选项,但我不满意,并且感到有必要再次编写自己的库。但我宁愿使用已经存在的东西。即使在搜索(在SO和网络上)之后,我仍然没有找到合适的东西,尽管我想这必须是一个经常需要的工具,所以肯定有一些社区库或至少常见的设计。 在SO上有一些帖子关于作业队列,但没有任何符合要求的。

我的帖子在这里是为了询问您所有错过的现有工具,和/或您如何编写自己的多线程作业队列。

11个回答

18
我们不得不建立自己的作业队列系统以满足类似于您的需求(UI线程必须始终在33ms内响应,作业可以运行15-15000ms),因为实际上没有任何东西完全满足我们的需求,更别说是高性能的了。
不幸的是,我们的代码就像专有软件一样专有,但我可以给您一些最显著的特点:
- 我们在程序开始时启动每个核心一个线程。每个线程都从全局作业队列中获取任务。作业由函数对象和相关数据块组成(实际上是对func_ptr和void *的说明)。线程0,即快速客户端循环,不允许处理作业,但其余线程可以尽可能多地抓取。 - 作业队列本身应该是无锁数据结构,例如无锁单向链表(Visual Studio 附带一个)。避免使用互斥锁;争夺队列的情况出奇的高,并且抓取互斥锁代价高昂。 - 将作业所需的所有必要数据打包到作业对象本身中,避免从作业指针返回到主堆中,那里必须处理作业和锁之间的争用以及所有其他缓慢而烦人的事情。例如,所有模拟参数应该放入作业的本地数据块中。结果结构显然需要是一个超过作业寿命的东西:您可以通过a)即使在它们完成运行后仍保持作业对象(这样您可以从主线程使用它们的内容)或b)专门为每个作业分配一个结果结构并将指针塞入作业的数据对象中来处理此问题。即使结果本身不会驻留在作业中,但这实际上为作业提供了对其输出内存的独占访问权,因此您不必改变锁定。
  • 实际上,我在上面有所简化,因为我们需要精确地协调每个核心上运行的作业,因此每个核心都有自己的作业队列,但这对你来说可能是不必要的。


  • 您介意解释一下“精确地编排哪些作业在哪些核心上运行,以便每个核心都有自己的作业队列”的含义吗?这样做的原因是什么?是为了处理器亲和性(https://en.wikipedia.org/wiki/Processor_affinity)吗? - athos
    2
    @athos 反过来说:处理器亲和力是我们用来确定哪个任务在哪个核心上运行的方式。我们需要精确地协调事物,以便两个实时线程能够在它们的核心上持续运行,而不会有任何被抢占的风险,同时其他任务被分配到特定的硬件核心上,以最大化缓存局部性、超线程效率等方面的利益。我们为固定的硬件平台开发,因此可以进行非常紧密的优化。 - Crashworks

    5
    我使用Boost.threads自己编写了一个库。我很惊讶,因为只写了很少的代码就得到了很好的效果。如果你找不到现成的东西,不要害怕自己编写。在Boost.threads和你自己编写后的经验之间,这可能比你记得的更容易。
    对于预制选项,请不要忘记Chromium的许可非常友好,所以您可以围绕其代码编写自己的通用库。

    4
    Microsoft正在开发一组技术,用于下一个版本的Visual Studio 2010,称为并发运行时、并行模式库和异步代理库,这些技术可能会有所帮助。并发运行时将提供基于策略的调度,即允许您管理和组合多个调度程序实例(类似于线程池,但具有亲和力和实例之间的负载平衡),并行模式库将提供基于任务的编程和STL样式的编程模型的并行循环。代理库提供了一种基于Actor的编程模型,并支持构建并发数据流管道,即管理上述依赖关系。不幸的是,这还没有发布,因此您可以在我们的团队博客上阅读相关信息或观看channel9上的视频,也可以下载非常大的CTP。
    如果您正在寻找解决方案,Intel的Thread Building Blocks和boost的线程库都是很好的库,并且现在可用。JustSoftwareSolutions已经发布了一个实现std::thread的版本,它与C++0x草案相匹配,当然,如果您正在寻找基于细粒度循环的并行性,则OpenMP是广泛可用的。
    正如其他人所提到的,真正的挑战是正确地识别和分解适合并发执行的任务(即没有受保护的共享状态),理解它们之间的依赖关系,并最小化可能发生在瓶颈上的争用(无论瓶颈是保护共享状态还是确保工作队列的调度循环是低争用或无锁)...而且要做到这一点,不让调度实现细节泄漏到代码的其余部分中。
    -Rick

    3

    我一直在寻找与此相似的需求。我正在开发一个具有4倍速机制的游戏,安排完成不同部分的工作几乎让我的大脑爆炸。我有一套复杂的工作需要在不同的时间分辨率下完成,并根据玩家所加载的系统/区域以不同程度的实际模拟进行。这意味着当玩家从一个系统移动到另一个系统时,我需要将一个系统加载到当前高分辨率模拟中,将最后一个系统转移到较低的分辨率模拟中,并为系统的活动/非活动区域执行相同的操作。不同的模拟是基于每个实体的档案,包括人口、政治、军事和经济行动的大型列表。我将尝试描述我的问题和目前的解决方案,希望能对您或其他人提供一个替代方案。我正在构建的结构的粗略概述将使用以下内容:

    1. cpp-taskflow(一种现代的 C++ 并行任务编程库)我将制作一个模块库,用作作业构建部件。每个条目都会有一个 API 用于初始化和销毁,以及用于通信的指针。我希望以一种方式编写它,使得它们可以使用 cpp-taskflow API 嵌套,以在作业创建时设置所有依赖项,但提供一种实时调整和具备可停止开关的方法。我正在制作的大部分内容都是状态机的决策树,或者行为树的状态机,因此作业数据结构将是时间分辨率标记数据的设置和状态,该数据指向实际统计信息和对象值。
    2. FlatBuffers 我打算使用这个库来构建“作业列表条目”以及“对象包装器”系统。作业队列中的每个条目都将是一个 flatbuffer 对象,描述所需完成的工作(模块的设置),并包含所需完成工作的数据(或共享数据指针)。对象存储的 flatbuffers 将包含表示实体表的数据。对于我来说,大部分实际数据都是需要进行决策和处理的数组。我还打算使用 flatbuffers 作为线程之间的通信/控制通道。我在是否要创建一个主“路由器”线程,让其他所有线程都通过它进行通信,或者每个线程都包含自己的线程通信通道并具有某种发现机制方面感到犹豫。
    3. SQLite 由于只有活动区域/系统需要进行更高分辨率的工作,游戏将创建一些后台作业列表(针对数千个系统及其实体),这些列表将非常大且长期存在。成千上万-数百万个作业(在我的想法中很大),每个作业需要完成未知的时间量。在我的情况下,我不关心它们何时完成,只要它们都完成了(长时间的战役)。我计划让每个线程获取内存中 SQLite 数据库的表作为作业队列。每个条目都包含 flatbuffer 工作的 blob、指向完成通知缓冲区的指针、指向更新控制缓冲区的指针以及其他字段来装饰作业项(位置、数据范围、优先级),这些字段在作业条目生成新作业时填充,并在消耗到数据库中时被消耗。这使我能够在作业之间创建关系,并仅构造查询,以便重新处理/更新作业、删除它们及其依赖项,或者更新/重新排序优先级或依赖项。所有这些都在 SQLite 数据库中使用,这意味着随时可以将整个数据库转储到磁盘上并稍后重新加载它,或者切换到附加到它并从磁盘上处理它。此外,这使我能够访问很多搜索和排序算法工作,通常需要大量不同类型的容器才能完成。能够使用 SQL 查询给我很多处理作业的选项。
    通信队列(作为数据库)的访问方式,我很犹豫是通过相应线程进行访问(每个线程都包含自己的消息数据库,并且模块API的访问已经抽象成锁/互斥量),还是通过一些主路由器线程进行所有更新、添加/删除和通信到一个大型数据库。我不知道哪种方式会给我造成最少的互斥锁问题。我曾经试图使用共享指针创建缓冲池和查找表的怪兽面条代码,使每个线程都有自己的输入和输出缓冲区。那时我决定只把巨大的列表保存到SQLite中。然后我想,为什么不把其他所有东西的flatbuffer对象放入数据表中呢。
    几乎将所有内容都放入数据库中意味着,对于每个模块,我可以编写SQL语句来表示我需要处理的数据视图,并在工作过程中轻松地进行轴心旋转。在数据库中保存这些作业本身意味着同样可以为它们做同样的事情。SQLite支持多线程访问,因此将其用作多线程作业队列管理器不应该太困难。
    总之,Cpp-Taskflow将允许您设置具有依赖关系链接和作业池多线程的复杂嵌套循环。它默认提供了大部分所需的结构。FlatBuffers将允许您创建作业声明和对象封装,便于将其作为工作单元轻松地输入到流缓冲区中,并在作业线程之间传递它们。而SQLite将允许您将流缓冲区作业标记和排队成 blob 条目,以一种应该能够最小化您的工作量添加、搜索、排序、更新和删除的方式进行。 它还使保存和重新加载变得轻松。快照和回滚也应该是可行的,您只需要注意事件的顺序和解决方案即可。
    请注意:此为个人经验分享,具有一定参考性。

    3

    您是否需要类似于线程池的东西?它基于boost::threads,实现了一个简单的线程任务队列,可以将工作函数传递给池化线程。


    2

    你可能想看一下基于流程的编程 - 它是基于异步组件之间的数据块流传。有Java和C#版本的驱动程序,以及许多预编码组件。它本质上是多线程的 - 实际上,唯一的单线程代码在组件内部,尽管您可以将时间约束添加到标准调度规则中。虽然它可能对您需要的内容过于细粒度,但这里可能有您可以使用的东西。


    1

    看一下boost::future(但也要看看这个discussionproposal),它似乎是并行编程的一个非常好的基础(特别是它似乎为C依赖于A和B类型的情况提供了极好的支持)。

    我稍微看了一下OpenMP,但(像你一样)我不认为它适用于除Fortran/C数值代码以外的任何东西。Intel的Threading Building Blocks对我来说更有趣。

    如果需要的话,在boost::thread的基础上自己实现线程池也不是太难。 [解释:一个线程池从一个线程安全的队列中获取functors(任务或工作)。请参见测试基准测试以获取使用示例。我还额外增加了一些复杂性,以支持具有优先级的任务(可选),以及执行任务会生成更多任务到工作队列的情况(这使得知道所有工作实际完成的时间更加棘手;与“待处理”相关的引用可以处理这种情况)。这可能会给你一些想法。]


    非常好。boost::future看起来非常类似于POCO的ActiveResults(http://pocoproject.org/poco/docs/Poco.ActiveResult.html)。再次强调,这不是一个工作队列管理器,但仍然是一个很棒的工具。Boost:thread的基础确实感觉是制作自定义管理器的最佳低级工具包。 - Marc Ditto

    1

    1

    有很多分布式资源管理器可供选择。几乎符合您要求的软件是Sun Grid Engine。 SGE在世界上一些最大的超级计算机上使用,并正在积极开发。

    还有类似的解决方案Torque, Platform LSF, 和 Condor

    听起来您可能想要自己创建一个,但以上所有软件都有足够的功能和性能。


    0

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接