std::thread
, std::async
和 std::future
组件(例如见这个答案),这些组件很直接明了。
然而,我并不太理解 std::promise
是什么,它的作用以及何时最好使用它。标准文档本身除了类摘要外,并没有太多信息,std::thread 也是如此。
请问有人可以给出一个简明扼要的示例,说明需要使用 std::promise
的情况以及它是最惯用的解决方案吗?std::thread
, std::async
和 std::future
组件(例如见这个答案),这些组件很直接明了。
然而,我并不太理解 std::promise
是什么,它的作用以及何时最好使用它。标准文档本身除了类摘要外,并没有太多信息,std::thread 也是如此。
请问有人可以给出一个简明扼要的示例,说明需要使用 std::promise
的情况以及它是最惯用的解决方案吗?C++11中有两个不同但相关的概念:异步计算(在其他地方调用的函数)和并发执行(线程,可以同时进行工作的东西)。这两个概念有点正交。异步计算只是函数调用的另一种味道,而线程是执行上下文。线程本身也很有用,但在这个讨论中,我将把它们视为实现细节。
异步计算有一个抽象层次结构。为了举例说明,假设我们有一个需要一些参数的函数:
int foo(double, char, bool);
std::future<T>
,表示类型为T
的未来值。该值可以通过成员函数get()
检索,该函数通过等待结果有效地将程序同步化。或者,future支持wait_for()
,可用于探测结果是否已经可用。Futures应被视为普通返回类型的异步插入替代品。对于我们的示例函数,我们期望一个std::future<int>
。std::async
: The most convenient and straight-forward way to perform an asynchronous computation is via the async
function template, which returns the matching future immediately:
auto fut = std::async(foo, 1.5, 'x', false); // is a std::future<int>
We have very little control over the details. In particular, we don't even know if the function is executed concurrently, serially upon get()
, or by some other black magic. However, the result is easily obtained when needed:
auto res = fut.get(); // is an int
We can now consider how to implement something like async
, but in a fashion that we control. For example, we may insist that the function be executed in a separate thread. We already know that we can provide a separate thread by means of the std::thread
class.
The next lower level of abstraction does exactly that: std::packaged_task
. This is a template that wraps a function and provides a future for the functions return value, but the object itself is callable, and calling it is at the user's discretion. We can set it up like this:
std::packaged_task<int(double, char, bool)> tsk(foo);
auto fut = tsk.get_future(); // is a std::future<int>
The future becomes ready once we call the task and the call completes. This is the ideal job for a separate thread. We just have to make sure to move the task into the thread:
std::thread thr(std::move(tsk), 1.5, 'x', false);
The thread starts running immediately. We can either detach
it, or have join
it at the end of the scope, or whenever (e.g. using Anthony Williams's scoped_thread
wrapper, which really should be in the standard library). The details of using std::thread
don't concern us here, though; just be sure to join or detach thr
eventually. What matters is that whenever the function call finishes, our result is ready:
auto res = fut.get(); // as before
Now we're down to the lowest level: How would we implement the packaged task? This is where the std::promise
comes in. The promise is the building block for communicating with a future. The principal steps are these:
The calling thread makes a promise.
The calling thread obtains a future from the promise.
The promise, along with function arguments, are moved into a separate thread.
The new thread executes the function and fulfills the promise.
The original thread retrieves the result.
As an example, here's our very own "packaged task":
template <typename> class my_task;
template <typename R, typename ...Args>
class my_task<R(Args...)>
{
std::function<R(Args...)> fn;
std::promise<R> pr; // the promise of the result
public:
template <typename ...Ts>
explicit my_task(Ts &&... ts) : fn(std::forward<Ts>(ts)...) { }
template <typename ...Ts>
void operator()(Ts &&... ts)
{
pr.set_value(fn(std::forward<Ts>(ts)...)); // fulfill the promise
}
std::future<R> get_future() { return pr.get_future(); }
// disable copy, default move
};
Usage of this template is essentially the same as that of std::packaged_task
. Note that moving the entire task subsumes moving the promise. In more ad-hoc situations, one could also move a promise object explicitly into the new thread and make it a function argument of the thread function, but a task wrapper like the one above seems like a more flexible and less intrusive solution.
Promise(承诺)与异常密切相关。单独的Promise接口无法完全传达它的状态,因此每当在Promise上执行操作时不合理时,就会抛出异常。所有异常都属于std::future_error
类型,它派生自std::logic_error
。首先,介绍一些限制条件:
未设置默认值的Promise是不活跃的。不活跃的Promise可以无后果地死亡。
通过get_future()
获得Future后,Promise变得活跃。但是,只能获取一个Future!
如果要使用其Future,则Promise必须通过set_value()
满足或通过set_exception()
设置异常,然后在其生命周期结束之前。已满足的Promise可以无后果地死亡,并且在Future上可用get()
。带有异常的Promise将在Future上调用get()
时引发存储的异常。如果Promise既没有值也没有异常而死亡,则在Future上调用get()
将引发"broken promise"异常。
这里演示了一个小测试系列,以展示这些各种异常行为。首先,是测试框架:
#include <iostream>
#include <future>
#include <exception>
#include <stdexcept>
int test();
int main()
{
try
{
return test();
}
catch (std::future_error const & e)
{
std::cout << "Future error: " << e.what() << " / " << e.code() << std::endl;
}
catch (std::exception const & e)
{
std::cout << "Standard exception: " << e.what() << std::endl;
}
catch (...)
{
std::cout << "Unknown exception." << std::endl;
}
}
现在开始进入测试环节。
案例1:未激活的 Promise
int test()
{
std::promise<int> pr;
return 0;
}
// fine, no problems
情景2:主动承诺,未使用
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
return 0;
}
// fine, no problems; fut.get() would block indefinitely
案例三:未来合约过多
int test()
{
std::promise<int> pr;
auto fut1 = pr.get_future();
auto fut2 = pr.get_future(); // Error: "Future already retrieved"
return 0;
}
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_value(10);
}
return fut.get();
}
// Fine, returns "10".
案例5:过度满意
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_value(10);
pr2.set_value(10); // Error: "Promise already satisfied"
}
return fut.get();
}
如果有一个以上的set_value
或set_exception
,则会抛出相同的异常。
Case 6: 异常
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
pr2.set_exception(std::make_exception_ptr(std::runtime_error("Booboo")));
}
return fut.get();
}
// throws the runtime_error exception
int test()
{
std::promise<int> pr;
auto fut = pr.get_future();
{
std::promise<int> pr2(std::move(pr));
} // Error: "broken promise"
return fut.get();
}
async
并且只是串行执行操作,但我必须检查启动标志是否有任何保证。 - Kerrek SBstd::function
有许多构造函数;没有理由不将这些暴露给my_task
的使用者。 - Kerrek SBget()
会引发异常。我将通过添加“在销毁之前”来澄清这一点,请告诉我是否足够清楚。 - Kerrek SBpromise
在线程支持库中的future
! - sunny moonstd::future
是一个异步返回对象("从共享状态中读取结果的对象"),而 std::promise
则是一个异步提供者("向共享状态提供结果的对象")。也就是说, promise 是一个你可以在上面 "设置" 结果的东西,以便你可以从关联的 future 上 "获取" 它。std::promise
是一种异步提供者,std::packaged_task
是另一种,std::async
的内部细节是第三种。每个提供者都可以创建一个共享状态,并给您提供一个共享该状态的 std::future
对象,并使该状态准备就绪。
std::async
是一个更高级别的便利工具,它为您提供一个异步结果对象,并在任务完成时内部处理创建异步提供者和使共享状态准备就绪。您可以使用 std::packaged_task
(或 std::bind
和 std::promise
)和 std::thread
模拟它,但使用 std::async
更安全、更容易。
std::promise
稍微低级一些,当你想将异步结果传递给 future,但使结果准备就绪的代码无法封装在适合传递给 std::async
的单个函数中时,可以使用它。例如,您可能有多个 promise 和关联的 future 数组,并且有一个单独的线程执行多个计算并在每个 promise 上设置结果。 async
只允许您返回单个结果,要返回多个结果,您需要多次调用 async
,这可能会浪费资源。future
是异步返回对象的具体示例,它是通过共享状态异步返回结果的对象。promise
是异步提供程序的具体示例,它是将值写入共享状态以便可以异步读取的对象。我的意思就是我所写的。 - Jonathan WakelyBartosz Milewski提供了一篇好的文章。
C++将futures的实现分成一组小块。
std::promise是其中的一部分。
promise是一个传递函数执行结果(或异常)的工具,用于将其从执行函数的线程传递到获取函数future的线程。
...
future是围绕promise通道的接收端构建的同步对象。
因此,如果您想使用future,就必须使用promise来获取异步处理的结果。
页面上的一个示例:
promise<int> intPromise;
future<int> intFuture = intPromise.get_future();
std::thread t(asyncFun, std::move(intPromise));
// do some other stuff
int result = intFuture.get(); // may throw MyException
粗略地说,你可以将std::promise
视为std::future
的另一端(这是不准确的,但为了说明,你可以把它想象成这样)。通信通道的消费者端将使用std::future
从共享状态中消费数据,而生产线程将使用std::promise
向共享状态写入。
std::async
可以概念上(这不是标准规定)理解为一个创建std::promise
,将其推入线程池(或类似的东西,可能是线程池,也可能是新线程等等),并返回相关的std::future
给调用者的函数。在客户端,您需要等待std::future
,而在另一端的线程中,会计算结果并将其存储在std::promise
中。注意:标准要求共享状态和std::future
,但在这种特定情况下并不要求存在std::promise
。 - David Rodríguez - dribeasstd::future
不会在线程上调用join
,它有一个指向共享状态的指针,该共享状态是实际通信缓冲区。共享状态具有同步机制(可能使用std::function
+ std::condition_variable
来锁定调用者,直到std::promise
被满足)。线程的执行与所有这些都是正交的,在许多实现中,您可能会发现std::async
不是由新线程执行然后加入,而是由生存期延伸到程序结束的线程池执行的。 - David Rodríguez - dribeasstd::async
的主要优势是运行时库可以为您做出正确的决策,关于创建线程的数量,在大多数情况下,我期望使用线程池的运行时。目前,VS2012 在底层确实使用了线程池,并且没有违反as-if规则。请注意,对于这个特定的as-if,几乎没有什么需要满足的保证。 - David Rodríguez - dribeasstd::promise
是从异步函数返回信息的通道或路径。 std::future
是同步机制,使调用者等待std::promise
中携带的返回值准备就绪(这意味着其值在函数内已经设置好)。
std::promise
作为一对 promise/future 的终点被创建,使用 get_future()
方法从 std::promise 创建一个 std::future
作为另一端点。 这是一个简单的、一次性的方式,通过消息使两个线程同步,其中一个线程向另一个线程提供数据。std::promise
的 set_value()
方法的线程到使用 std::future
的 get()
方法接收数据的线程。如果调用 future 的 get() 方法多次,则会生成异常。std::promise
的线程没有使用 set_value()
来实现它的承诺,那么当第二个线程调用 get()
从 std::future
收集 promise 时,第二个线程将进入等待状态,直到第一个线程使用 set_value()
方法发送数据来履行 promise。co_await
,也可以使用 std::future
和 std::async
编写协程功能。请参见https://dev59.com/86vka4cB1Zd3GeqP0OEA#50753040中的讨论和示例,其中有一个部分讨论了使用 std::future
与 co_await
的用法。std::cout
打印到控制台的各种消息将是清晰的,而且来自几个线程的文本不会混合在一起。
main()
的第一部分是创建三个附加线程,并使用std::promise
和std::future
在这些线程之间传递数据。有趣的地方在于主线程启动了一个名为T2的线程,该线程将等待来自主线程的数据,执行某些操作,然后将数据发送到第三个线程T3,T3将执行其他操作并将数据发送回主线程。
main()
的第二部分创建了两个线程和一组队列,允许主线程向每个创建的线程发送多个消息。我们不能使用std::promise
和std::future
,因为promise/future组合是单次使用的,无法重复使用。
Sync_queue
类的源代码来自Stroustrup的《C++程序设计语言》第4版。// cpp_threads.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <iostream>
#include <thread> // std::thread is defined here
#include <future> // std::future and std::promise defined here
#include <list> // std::list which we use to build a message queue on.
static std::atomic<int> kount(1); // this variable is used to provide an identifier for each thread started.
//------------------------------------------------
// create a simple queue to let us send notifications to some of our threads.
// a future and promise are one shot type of notifications.
// we use Sync_queue<> to have a queue between a producer thread and a consumer thread.
// this code taken from chapter 42 section 42.3.4
// The C++ Programming Language, 4th Edition by Bjarne Stroustrup
// copyright 2014 by Pearson Education, Inc.
template<typename Ttype>
class Sync_queue {
public:
void put(const Ttype &val);
void get(Ttype &val);
private:
std::mutex mtx; // mutex used to synchronize queue access
std::condition_variable cond; // used for notifications when things are added to queue
std::list <Ttype> q; // list that is used as a message queue
};
template<typename Ttype>
void Sync_queue<Ttype>::put(const Ttype &val) {
std::lock_guard <std::mutex> lck(mtx);
q.push_back(val);
cond.notify_one();
}
template<typename Ttype>
void Sync_queue<Ttype>::get(Ttype &val) {
std::unique_lock<std::mutex> lck(mtx);
cond.wait(lck, [this]{return !q.empty(); });
val = q.front();
q.pop_front();
}
//------------------------------------------------
// thread function that starts up and gets its identifier and then
// waits for a promise to be filled by some other thread.
void func(std::promise<int> &jj) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
std::cout << " func " << myId << " future " << ll << std::endl;
}
// function takes a promise from one thread and creates a value to provide as a promise to another thread.
void func2(std::promise<int> &jj, std::promise<int>&pp) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
auto promiseValue = ll * 100; // create the value to provide as promised to the next thread in the chain
pp.set_value(promiseValue);
std::cout << " func2 " << myId << " promised " << promiseValue << " ll was " << ll << std::endl;
}
// thread function that starts up and waits for a series of notifications for work to do.
void func3(Sync_queue<int> &q, int iBegin, int iEnd, int *pInts) {
int myId = std::atomic_fetch_add(&kount, 1);
int ll;
q.get(ll); // wait on a notification and when we get it, processes it.
while (ll > 0) {
std::cout << " func3 " << myId << " start loop base " << ll << " " << iBegin << " to " << iEnd << std::endl;
for (int i = iBegin; i < iEnd; i++) {
pInts[i] = ll + i;
}
q.get(ll); // we finished this job so now wait for the next one.
}
}
int _tmain(int argc, _TCHAR* argv[])
{
std::chrono::milliseconds myDur(1000);
// create our various promise and future objects which we are going to use to synchronise our threads
// create our three threads which are going to do some simple things.
std::cout << "MAIN #1 - create our threads." << std::endl;
// thread T1 is going to wait on a promised int
std::promise<int> intPromiseT1;
std::thread t1(func, std::ref(intPromiseT1));
// thread T2 is going to wait on a promised int and then provide a promised int to thread T3
std::promise<int> intPromiseT2;
std::promise<int> intPromiseT3;
std::thread t2(func2, std::ref(intPromiseT2), std::ref(intPromiseT3));
// thread T3 is going to wait on a promised int and then provide a promised int to thread Main
std::promise<int> intPromiseMain;
std::thread t3(func2, std::ref(intPromiseT3), std::ref(intPromiseMain));
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2 - provide the value for promise #1" << std::endl;
intPromiseT1.set_value(22);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.2 - provide the value for promise #2" << std::endl;
std::this_thread::sleep_for(myDur);
intPromiseT2.set_value(1001);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.4 - set_value 1001 completed." << std::endl;
std::future<int> intFutureMain(intPromiseMain.get_future());
auto t3Promised = intFutureMain.get();
std::cout << "MAIN #2.3 - intFutureMain.get() from T3. " << t3Promised << std::endl;
t1.join();
t2.join();
t3.join();
int iArray[100];
Sync_queue<int> q1; // notification queue for messages to thread t11
Sync_queue<int> q2; // notification queue for messages to thread t12
std::thread t11(func3, std::ref(q1), 0, 5, iArray); // start thread t11 with its queue and section of the array
std::this_thread::sleep_for(myDur);
std::thread t12(func3, std::ref(q2), 10, 15, iArray); // start thread t12 with its queue and section of the array
std::this_thread::sleep_for(myDur);
// send a series of jobs to our threads by sending notification to each thread's queue.
for (int i = 0; i < 5; i++) {
std::cout << "MAIN #11 Loop to do array " << i << std::endl;
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q1.put(i + 100);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q2.put(i + 1000);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
}
// close down the job threads so that we can quit.
q1.put(-1); // indicate we are done with agreed upon out of range data value
q2.put(-1); // indicate we are done with agreed upon out of range data value
t11.join();
t12.join();
return 0;
}
MAIN #1 - create our threads.
MAIN #2 - provide the value for promise #1
func 1 future 22
MAIN #2.2 - provide the value for promise #2
func2 2 promised 100100 ll was 1001
func2 3 promised 10010000 ll was 100100
MAIN #2.4 - set_value 1001 completed.
MAIN #2.3 - intFutureMain.get() from T3. 10010000
MAIN #11 Loop to do array 0
func3 4 start loop base 100 0 to 5
func3 5 start loop base 1000 10 to 15
MAIN #11 Loop to do array 1
func3 4 start loop base 101 0 to 5
func3 5 start loop base 1001 10 to 15
MAIN #11 Loop to do array 2
func3 4 start loop base 102 0 to 5
func3 5 start loop base 1002 10 to 15
MAIN #11 Loop to do array 3
func3 4 start loop base 103 0 to 5
func3 5 start loop base 1003 10 to 15
MAIN #11 Loop to do array 4
func3 4 start loop base 104 0 to 5
func3 5 start loop base 1004 10 to 15
异步处理中有三个核心实体,C++11目前主要关注其中的两个。
运行某些逻辑异步所需的核心要素包括:
C++11称我所说的(1)为std::promise
,而(3)则为std::future
。
std::thread
是为(2)提供的唯一公共内容。不幸的是,真正的程序需要管理线程和内存资源,大多数情况下,它们希望任务在线程池上运行,而不是为每个小任务创建和销毁线程(这几乎总是导致不必要的性能损失,并且很容易造成更严重的资源耗尽)。
根据Herb Sutter和C++11脑力信托中的其他人的说法,有暂定计划添加一个std::executor
,它将-就像在Java中一样-是(2)的线程池和逻辑上类似的设置的基础。也许我们会在C++2014中看到它,但我敢打赌更多像是C++17(如果他们搞砸了这些标准,上帝保佑我们)。
承诺是电线的另一端。
想象一下,您需要检索由async
计算的future
的值。然而,您不希望在同一线程中进行计算,甚至不想立即生成一个线程 - 也许您的软件被设计为从池中选择线程,因此您不知道将来会有谁执行计算。
现在,您要向这个(尚未知)的线程/类/实体传递什么?您不会传递future
,因为这是结果。您想传递与future
连接并表示future
的另一端电线的东西,因此您只需查询future
,而不知道谁将实际计算/编写某些内容。
这是Promise
,它是连接到您future
的句柄。如果future
是一个扬声器,并且使用get()
开始侦听直到有声音出现,那么promise
就是麦克风;但不仅仅是任何麦克风,它是连接到您手中扬声器的单根电线的唯一麦克风。您可能知道在另一端的人是谁,但您不需要知道 - 您只需提供它并等待对方说话。
http://www.cplusplus.com/reference/future/promise/
一句话解释:future::get() 会永远等待 promise::set_value()。void print_int(std::future<int>& fut) {
int x = fut.get(); // future would wait prom.set_value forever
std::cout << "value: " << x << '\n';
}
int main()
{
std::promise<int> prom; // create promise
std::future<int> fut = prom.get_future(); // engagement with future
std::thread th1(print_int, std::ref(fut)); // send future to new thread
prom.set_value(10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
std::promise
是创建std::future
的起点。std::future
可以让你获取一个已经被承诺过的值。当你在 future 上调用get()
方法时,它会等待与其对应的std::promise
对象的所有者通过调用set_value
方法设置值。如果 promise 在值被设置前被销毁,并且你在与该 promise 关联的 future 上调用了get()
方法,你将会得到一个std::broken_promise
异常,因为你曾经被承诺过一个值,但是现在无法获得该值。 - James McNellisstd::broken_promise
是最好命名的标识符。而且没有std::atomic_future
。 - Cubbi