Boost asio处理程序中的长时间运行/阻塞操作

8

现状

我使用boost.asio实现了一个TCP服务器,目前在单个线程中使用一个io_service对象,并从该对象中调用run方法。

到目前为止,由于所有必要的信息都存储在内存中(接收处理程序中没有长时间运行的操作),因此服务器能够立即响应客户端的请求。

问题

现在要求发生变化,我需要从数据库(通过ODBC)获取一些信息(基本上是一个长时间的阻塞操作),以便为客户端创建响应。

我看到了几种方法,但不知道哪种方法最好(可能还有更多的方法):

第一种方法

我可以在处理程序中保持长时间运行的操作,并从多个线程中简单地调用io_service.run()。我想我将使用尽可能多的CPU核心?

虽然这种方法易于实现,但我认为由于线程数量有限(由于数据库访问是I/O绑定操作而不是计算绑定操作,因此大部分时间都处于空闲状态),所以我不会得到最佳性能。

第二种方法

此文档的第6节中说:

使用线程进行长时间任务

这种单线程设计的变体仍然使用单个io_service::run()线程来实现协议逻辑。将长时间运行或阻塞任务传递给后台线程,一旦完成,结果就会被发布回io_service::run()线程。

这听起来很有前途,但我不知道该如何实施。是否有人可以为这种方法提供一些代码片段/示例?

第三种方法

Boris Schäling在他的boost介绍的第7.5节中解释了如何使用自定义服务扩展boost.asio。

这看起来需要很多工作。与其他方法相比,这种方法有哪些优点?

2个回答

16

这些方法并非明确互斥。我经常看到第一种和第二种的组合:

  • 一个或多个线程在一个io_service中处理网络I/O。
  • 长时间运行的或阻塞的任务被发布到另一个io_service中。 这个io_service作为线程池,不会干扰处理网络I/O的线程。或者,每次需要长时间运行或阻塞的任务时可以生成一个分离的线程; 然而,线程创建/销毁的开销可能会有显著的影响。

这是一个提供线程池实现的答案。此外,这里有一个基本示例,试图强调两个io_services之间的交互。

#include <iostream>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/chrono.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>

/// @brief Background service will function as a thread-pool where
///        long-standing blocking operations may occur without affecting
///        the network event loop.
boost::asio::io_service background_service;

/// @brief The main io_service will handle network operations.
boost::asio::io_service io_service;

boost::optional<boost::asio::io_service::work> work;

/// @brief ODBC blocking operation.
///
/// @brief data Data to use for query.
/// @brief handler Handler to invoke upon completion of operation.
template <typename Handler>
void query_odbc(unsigned int data,
                Handler handler)
{
  std::cout << "in background service, start querying odbc\n";
  std::cout.flush();
  // Mimic busy work.
  boost::this_thread::sleep_for(boost::chrono::seconds(5));

  std::cout << "in background service, posting odbc result to main service\n";
  std::cout.flush();
  io_service.post(boost::bind(handler, data * 2));
}

/// @brief Functions as a continuation for handle_read, that will be
///        invoked with results from ODBC.
void handle_read_odbc(unsigned int result)
{
  std::stringstream stream;
  stream << "in main service, got " << result << " from odbc.\n";
  std::cout << stream.str();
  std::cout.flush();

  // Allow io_service to stop in this example.
  work = boost::none;
}

/// @brief Mocked up read handler that will post work into a background
///        service.
void handle_read(const boost::system::error_code& error,
                 std::size_t bytes_transferred)
{
  std::cout << "in main service, need to query odbc" << std::endl;
  typedef void (*handler_type)(unsigned int);
  background_service.post(boost::bind(&query_odbc<handler_type>,
    21,                // data
    &handle_read_odbc) // handler
  );

  // Keep io_service event loop running in this example.
  work = boost::in_place(boost::ref(io_service));
} 

/// @brief Loop to show concurrency.
void print_loop(unsigned int iteration)
{
  if (!iteration) return;

  std::cout << "  in main service, doing work.\n";
  std::cout.flush();
  boost::this_thread::sleep_for(boost::chrono::seconds(1));
  io_service.post(boost::bind(&print_loop, --iteration));  
}

int main()
{
  boost::optional<boost::asio::io_service::work> background_work(
      boost::in_place(boost::ref(background_service)));

  // Dedicate 3 threads to performing long-standing blocking operations.
  boost::thread_group background_threads;
  for (std::size_t i = 0; i < 3; ++i)
    background_threads.create_thread(
      boost::bind(&boost::asio::io_service::run, &background_service));

  // Post a mocked up 'handle read' handler into the main io_service.
  io_service.post(boost::bind(&handle_read,
    make_error_code(boost::system::errc::success), 0));

  // Post a mockup loop into the io_service to show concurrency.
  io_service.post(boost::bind(&print_loop, 5));  

  // Run the main io_service.
  io_service.run();

  // Cleanup background.
  background_work = boost::none;
  background_threads.join_all();
}

输出结果:

在主服务中,需要查询ODBC。
在主服务中,正在工作。
在后台服务中,开始查询ODBC。
在主服务中,正在工作。
在主服务中,正在工作。
在主服务中,正在工作。
在主服务中,正在工作。
在后台服务中,将ODBC结果发布到主服务中。
在主服务中,从ODBC获得了42。

请注意,处理主io_service的单个线程将任务发布到background_service,然后继续处理其事件循环,而background_service则会阻塞。一旦background_service获得结果,它将把处理程序发布到主io_service中。


感谢您提供这个出色的答案。我希望我能给它更多的赞 :) - Robert Hegner

1
我们的服务器有一些长时间运行的任务(使用旧协议和存储)。因此,我们的服务器运行了200个线程以避免阻塞服务(是的,200个线程正在运行io_service :: run)。这不是太好的事情,但现在很好用。
唯一的问题是asio :: strand使用所谓的“实现”,当处理程序当前被调用时会被锁定。通过增加这些strands的桶并通过io_service :: post而不是strand包装来“解除”任务已解决此问题。
一些任务可能需要几秒钟甚至几分钟才能运行,目前没有问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接