我该如何利用boost::packaged_task、函数参数和boost::asio::io_service?

7
首先,我要为这篇长文道歉。我想尽可能详细地说明问题。
我已经被这个问题困扰了几天,但关于在具有输入参数的函数上正确使用boost::packaged_task的信息非常少。
系统信息
- C++03 - Boost 1.54.0 - CMake 2.8.9
最初的需求
- 我有一个由客户端、服务器和设备组成的设置。 - 客户端通过向服务器发送请求与设备交互。
- 这些请求被检查并路由到适当的设备。 - 请求是异步处理的,并且由于各种原因偶尔会通过boost::asio::io_service::strand排队。
- 请求被放置到设备本身的本地队列中。
- 当请求被确认(不一定完成)时,它被分配一个ID并返回给客户端。

在查看了boost::futures后,我们决定boost::packaged_task正好符合我们的需求。然而,实现中似乎存在一个bug。

看起来packaged_task有几个不同的模板可供选择:

  1. packaged_task<R>
  2. packaged_task<R()>
  3. packaged_task<R(ArgTypes)>
  4. 可能还有其他的我没有列出来。

为确保我正确使用此函数,我从简单的地方开始;以boost::futures页面上的简单示例作为起点。从那里,我创建了四个简单的函数:

  • 无参数,返回int。
  • 有参数,返回int。
  • 无参数,返回std::string
  • 有参数,返回std::string

测试函数

std::string ans("forty two");

int int_no_params()
{
    return 42;
}

int int_with_params(int param)
{
    return param;
}

std::string string_no_params()
{
    return std::string("forty two");
}

std::string string_with_params(std::string & param) // Have tried both with and without '&'
{
    return param;
}

例子1:

int function(void)

    //! Compiles and produces correct result.  
    {
        boost::packaged_task<int()> example(int_no_params);
        boost::future<int> f = example.get_future();
        boost::thread task(boost::move(example));
        int answer = f.get();
        std::cout << "Answer to life and whatnot, in English: " << answer << std::endl;
        task.join();
    }

例子 2:

std::string function(void)

    //! Compiles and produces correct result.
    {
        boost::packaged_task<std::string()> example(string_no_params);
        boost::future<std::string> f = example.get_future();
        boost::thread task(boost::move(example));
        std::string answer = f.get();
        std::cout << "string_no_params: " << answer << std::endl;
        task.join();
    }

例子3:

std::string(std::string& param) 无线程

//! Doesn't compile.
//! error: variable ‘boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> example’ has initializer but incomplete type

{
    boost::packaged_task<std::string(std::string&)> example(string_with_params);
    boost::future<std::string> f = example.get_future();
    example(ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
}

例子 4:

使用 boost::threading

//! Doesn't compile.
//! error: variable ‘boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> example’ has initializer but incomplete type
{
    boost::packaged_task<std::string(std::string&)> example(string_with_params);
    boost::future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}

例子5:

在packaged_task声明中使用扩展初始化器

//! Doesn't compile in C++03, C++11 only.
//! error: extended initializer lists only available with -std=c++11 or -std=gnu++11 [-Werror]
{
    boost::packaged_task<std::string(std::string&)> example
    { boost::bind(&string_with_params, ans) };
    boost::future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}

例子6:

使用shared_ptr进行线程化

以下使用typedef boost::packaged_task<std::string(std::string&)> task_t;

由于打包任务无法复制,绑定shared_ptr<T>::operator()task是在这里找到的建议解决方案。

// error: invalid use of incomplete type ‘class boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>’
// error: incomplete type ‘task_t {aka boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>}’ used in nested name specifier
// boost/thread/future.hpp:1320:11: error: declaration of ‘class boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>’
{
    boost::shared_ptr<task_t> example = boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::future<std::string> f = example->get_future();
    boost::thread task(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}

示例 7:

使用 boost::asio::io_serviceboost::bind

// 错误:无效使用未完成的类型 'class boost::packaged_task(std::basic_string&)>' // 错误:不完整类型 'task_t {aka boost::packaged_task(std::basic_string&)>}' 在嵌套名称指示符中使用 // boost/thread/future.hpp:1320:11:错误:声明了 'class boost::packaged_task(std::basic_string&)>'。

{
    boost::asio::io_service io_service;
    boost::thread_group threads;
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < 3; ++i)
    {
        threads.create_thread(boost::bind(&boost::asio::io_service::run,
            &io_service));
    }

    boost::shared_ptr<task_t> example = boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::future<std::string> f = example->get_future();
    io_service.post(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    threads.join_all();
}

这里有什么我做得非常糟糕的地方吗?我感觉我已经详尽地测试过了,但是没有取得任何进展。我已经尝试了所有其他绑定、线程和任务的组合来使其工作,但它根本不起作用。感谢您提供的任何帮助。
最后说明:
我有一个使用 futures 和 promises 的有效解决方案,并通过使用私有函数发布到我的线程,返回一个有效的 future。这个问题似乎不是用户错误。
谢谢阅读。
1个回答

5
虽然我在文档中没有找到明确注明的限制,但是更改历史记录指出,为了符合C++11标准,Boost.Thread的packaged_task可以提供参数类型:

C++11兼容性:将ArgTypes添加到packaged_task模板。 当定义了BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK时提供(默认值来自Boost 1.55)。

相关票据说明,在不可用可变参数模板的情况下,只会提供签名R()
由于C++03缺少可变参数模板,示例3-7将失败。 此外,示例6和7存在类型不匹配。 虽然task_t将其函数类型指定为std::string(std::string&),但在boost::bind()期间绑定到函数对象的第一个且唯一的参数。 由于生成的函数对象不需要更多参数,因此提供给packaged_task的函数类型应为std::string()
虽然packaged_task在C++03中不支持参数,但是一种中间解决方案是创建一个包装较低级别的boost::promise的函数对象类型。 由于没有可变参数模板和完美转发的支持,因此在operator()重载中将会有很多样板代码。 尽管如此,以下是一个忽略promisefuture之间异常处理的基本示例函数对象:
/// @brief basic_task to support function types with arguments.  This
///        provides a minimal feature workaround to Boost.Thread's
///        packaged_task not supporting argument types for C++03.
template <typename Fn>
class basic_task
{
public:
  // @brief The type the future will return.
  typedef typename boost::function_types::result_type<Fn>::type result_type;

  typedef boost::promise<result_type> promise_type;

  /// @brief Constructor.
  template <typename F> 
  explicit basic_task(const F& f)
    : fn_(f),
      promise_(boost::make_shared<promise_type>())
  {}

  // Overload operator() functions.

  void operator()()
  {
    promise_->set_value(fn_());
  }

  template <typename A1>
  void operator()(const A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  template <typename A1>
  void operator()(A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  /// @brief Get a future for this task' promise.
  boost::unique_future<result_type>
  get_future()
  {
    return promise_->get_future();
  }

private:
  boost::function<Fn> fn_;
  boost::shared_ptr<promise_type> promise_;
};

完整的示例系列:

#include <iostream>
#include <string>

#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK

#include <boost/asio.hpp>
#include <boost/function_types/result_type.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>

/// @brief basic_task to support function types with arguments.  This
///        provides a minimal feature workaround to Boost.Thread's
///        packaged_task not supporting argument types for C++03.
template <typename Fn>
class basic_task
{
public:
  // @brief The type the future will return.
  typedef typename boost::function_types::result_type<Fn>::type result_type;

  typedef boost::promise<result_type> promise_type;

  /// @brief Constructor.
  template <typename F> 
  explicit basic_task(const F& f)
    : fn_(f),
      promise_(boost::make_shared<promise_type>())
  {}

  // Overload operator() functions.

  void operator()()
  {
    promise_->set_value(fn_());
  }

  template <typename A1>
  void operator()(const A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  template <typename A1>
  void operator()(A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  /// @brief Get a future for this task' promise.
  boost::unique_future<result_type>
  get_future()
  {
    return promise_->get_future();
  }

private:
  boost::function<Fn> fn_;
  boost::shared_ptr<promise_type> promise_;
};

std::string ans("forty two");

int int_no_params()
{
  return 42;
}

int int_with_params(int param)
{
  return param;
}

std::string string_no_params()
{
  return std::string("forty two");
}

std::string string_with_params(std::string & param)
{
  return param;
}

int main()
{
  // example 1
  {
    boost::packaged_task<int()> example(&int_no_params);
    boost::unique_future<int> f = example.get_future();
    boost::thread task(boost::move(example));
    int answer = f.get();
    std::cout << "Answer to life and whatnot, in English: "
              << answer << std::endl;
    task.join();
  }

  // example 2
  {
    boost::packaged_task<std::string()> example(&string_no_params);
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example));
    std::string answer = f.get();
    std::cout << "string_no_params: " << answer << std::endl;
    task.join();
  }

  // example 3
  {
    basic_task<std::string(std::string&)> example(&string_with_params);
    boost::unique_future<std::string> f = example.get_future();
    example(ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
  }

  // example 4
  {
    basic_task<std::string(std::string&)> example(&string_with_params);
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 5
  {
    basic_task<std::string(std::string&)>
        example(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 6
  {
    typedef boost::packaged_task<std::string()> task_t;
    boost::shared_ptr<task_t> example =
        boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example->get_future();
    boost::thread task(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 7
  {
    boost::asio::io_service io_service;
    boost::thread_group threads;
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < 3; ++i)
      threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &io_service));

    typedef boost::packaged_task<std::string()> task_t;
    boost::shared_ptr<task_t> example =
        boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example->get_future();
    io_service.post(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    io_service.stop();
    threads.join_all();
  }
}

生成的输出如下:

Answer to life and whatnot, in English: 42
string_no_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two

感谢您抽出时间回复。在整个 futures 库引起无尽的麻烦后,我们决定追求其他途径。使用打包任务和 futures 时,具体地说是 .then(...),会在我们的应用程序中导致死锁的情况不少。 - JohanBjorn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接