等待多个未来?

61

我想运行同一类型的任务(工作线程),但同时不超过一定数量的任务。当一个任务完成时,它的结果将成为一个新任务的输入,然后可以启动新任务。

有没有好的方法在C++11中用async/future范例实现这个需求?

乍一看,这似乎很简单,你只需要使用以下代码来生成多个任务:

std::future<T> result = std::async(...);

然后,运行 result.get() 以获取任务的异步结果。

然而,问题在于future对象必须存储在某种队列中,并逐个等待。虽然可以一遍又一遍地迭代未来的对象,检查它们是否准备好,但由于不必要的CPU负载,这并不是所期望的。

是否有可能以某种方式等待给定集合中任何一个未来就绪并获取其结果?

到目前为止,我唯一想到的选择是一种没有任何异步/future的老派方法。具体来说,生成多个工作线程,在每个线程结束时将其结果推入互斥保护的队列中,并通过条件变量通知等待线程,队列已更新了更多的结果。

是否存在其他使用异步/future的更好解决方案?


12
在C++14中,这将是when_any(futures...).then(foo),但目前来说,你有点运气不好。 - Xeo
5
有了Boost库,你可以使用wait_for_any。如果只用纯C++11,目前还没有这个功能。 - ComicSansMS
1
@Xeo, @ComicSansMS,感谢你们提供的提示!更多关于C++14 when_any的信息在此处:http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2013/n3558.pdf。Boost的wait_for_any正是我需要的。据我所知,它与不支持注册外部等待者(cvs)的std::future不兼容,对吧? - alveko
1
“同时生成多个工作线程,在每个线程结束时将其结果推入受互斥保护的队列中”听起来不错。如果工作线程偶尔检查队列大小≥1,您甚至不需要通知。” - Lightness Races in Orbit
3
抱歉让你失望了,但我认为 when_any/.then(...) 没有被纳入 C++14 中。我认为它们将会出现在在芝加哥启动的并发 TS 中。 - je4d
显示剩余3条评论
4个回答

21

C++11中的线程支持只是一个初步尝试,虽然 std::future 很棒,但它目前还不支持多个等待。

然而,您可以相对低效地模拟它。您需要为每个 std::future 创建一个辅助线程(非常昂贵),然后将它们的 "这个 future 已经准备好了" 收集到一个同步的多生产者单消费者消息队列中,然后设置一个消费者任务来分发给定 std::future 准备就绪的事实。

在这种系统中,std::future 不会增加太多功能,直接声明它们已准备好并将其结果放入上述队列的任务将更有效。如果您选择这条路线,可以编写与 std::asyncstd::thread 模式匹配的包装器,并返回类似于 std::future 的对象,表示队列消息。这基本上涉及重新实现一部分并发库。

如果要使用 std::future,可以创建 shared_future 并使每个依赖任务都依赖于一组 shared_future:即无需中央调度程序即可完成此操作。这不允许像中止/关闭消息这样的东西,我认为这是一个强健的多线程任务系统所必需的。

最后,您可以等待 C++2x,或者当并发 TS 折叠到标准中时,它会自动解决这个问题。


如果在C++14中已经解决了这个问题,那么请更新您的答案,谢谢。 - Steephen
4
抱歉,这并不是C++14版本的代码,我过于乐观了。我添加了一篇文章链接,我认为它可以解决这个问题。它不仅有when_anywhen_all,还可以将.then附加到一组future上,在其中发出条件变量信号,您可以在其上等待:类似于OP中的“老式”方法,但是内容的生产者无需知道队列机制的存在,只需提供一个带有.thenfuture即可。模拟.then需要每个future多达一个额外线程的开销,这远非理想。 - Yakk - Adam Nevraumont
1
C++1z无法解决这个问题。虽然已经达成共识,但显然太晚了,无法在2017年内实现 :-( - graham.reeds
1
@graham.reeds 点头。我的意思是,它在2013年就已经在开发/讨论中了,你可以理解我认为它可能会出现在C++17中。;) 如果你手头有它的标准轨道TS是哪个,我可以用那些信息来改进答案。 - Yakk - Adam Nevraumont
2
在2015年初,期货的扩展被移入了并发TS,当时它们已经基本完成。 - graham.reeds
4
似乎C++20中也没有这个功能。 - Lothar

6
你可以创建所有“一代”的功能,并将所有这些功能提供给你的“二代”任务,然后它们将等待自己的输入。

很棒的想法,点赞!不幸的是,在我的程序中,安排任务的逻辑比我在问题描述中描述的要复杂得多,所以这个想法不能用。 - alveko

2

Facebook的folly在未来有collectAny/collectN/collectAll的功能,我还没有尝试过,但看起来很有前途。


1

鉴于“等待多个未来”的标题吸引了像“是否有一个等待所有未来列表的等待?”这样的问题,您可以通过跟踪待处理的线程来充分实现此功能:

unsigned pending = 0;
for (size_t i = 0; i < N; ++i) {
    ++pending;
    auto callPause =
        [&pending, i, &each, &done]()->unsigned {
            unsigned ret = each();
            results[i] = ret;
            if (!--pending)
                // called in whatever thread happens to finish last
                done(results);
            return ret;
        };
    futures[i] = std::async(std::launch::async, each);
}

完整例子

或许可以使用 std::experimental::when_all 与扩展操作符


1
这个答案需要更好的解释,因为很繁琐去查看它解决了什么问题,以及如何使用它。你的代码片段也不完整,并且只有作为你链接的完整示例的一部分才有意义。 - Florian Winter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接