获取 ZeroMQ 的套接字只是其中最小的一部分。ZeroMQ 基于一个
协议,该协议在 TCP 上进行了分层,因此如果您选择这条路线,则必须在自定义 Boost.Asio io_service 中重新实现 ZeroMQ。当我尝试使用 Boost.Asio 创建异步
ENet服务时,我遇到了同样的问题,首先尝试使用 Boost.Asio UDP 服务捕获来自 ENet 客户端的流量。ENet 是一个类似于 TCP 的协议,它在 UDP 上进行了分层,因此在那个时候,我所实现的只是在一个几乎无用的状态下捕获数据包。
Boost.Asio 是基于模板的,内置的 io_service 使用模板来包装系统套接字库,以创建 TCP 和 UDP 服务。我的最终解决方案是创建一个自定义 io_service,它包装了 ENet 库而不是系统套接字库,从而允许它使用 ENet 的传输函数,而不必使用内置的 UDP 传输重新实现它们。
同样的方法也可以用于ZeroMQ,但是ZeroMQ本身已经是一个非常高性能的网络库,已经提供了异步I/O。我认为你可以通过使用ZeroMQ的现有API接收消息,并将这些消息传递到io_service线程池中来创建一个可行的解决方案。这样,消息/任务仍将使用Boost.Asio的反应器模式异步处理,而无需重写任何内容。ZeroMQ将提供异步I/O,而Boost.Asio将提供异步任务处理程序/工作者。
现有的io_service也可以与现有的TCP套接字耦合在一起,从而使线程池可以处理TCP(在您的情况下为HTTP)和ZeroMQ。在这种设置中,ZeroMQ任务处理程序完全可以访问TCP服务会话对象,从而允许您将ZeroMQ消息/任务的结果发送回TCP客户端。
以下内容仅用于说明概念。
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
while (1) {
char buffer [10];
zmq_recv (responder_, buffer, 10, 0);
io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
zmq_poll()
请求方面有什么延迟要求?你可以通过在io_service
中运行zmq_poll()
(通过在deadline_timer
中处理它)来实现。如果您认为这是可接受的,我可以在答案中编写一个示例。 - Chad