使用ZeroMQ和Boost::ASIO进行配合

25
我有一个使用ZeroMQ进行消息传递的C++应用程序,但它还必须为基于AJAX/Comet的Web服务提供SGCI连接。为此,我需要使用普通的TCP套接字。我可以通过正常的Posix套接字来实现,但为了保持跨平台便携性并使我的生活更加轻松(我希望如此...),我想使用Boost::ASIO。但现在我遇到了ZMQ想要使用自己的zmq_poll()和ASIO它的io_service.run()之间的冲突...有没有办法让ASIO与0MQ的zmq_poll()一起工作?或者有其他推荐的方法来实现这样的设置吗?注意:我可以通过使用多个线程来解决这个问题 - 但它只是一个运行该程序的非常低SCGI流量的小单核/CPU盒子,因此多线程将浪费资源...

对于我们中不熟悉ZeroMQ的人,您能详细介绍一下zmq_poll()吗?我假设它是某种事件循环? - Sam Miller
ZeroMQ的poll示例(请查看第二个“代码块”)。从zmq_poll文档中可以得知,将zmq_poll()的超时设置为0会使其立即返回。 - g19fanatic
但这会使代码进入旋转/主动等待状态。对性能不是很好... - Chris
2
多线程会浪费资源 - 基于什么?单核 CPU 并不是多线程的问题,特别是当线程几乎没有工作要做时。如果您在可用内存量方面受到严重限制(例如嵌入式解决方案),那么资源浪费将是一个问题,但您并没有说这是情况。 - Chad
你在处理 zmq_poll() 请求方面有什么延迟要求?你可以通过在 io_service 中运行 zmq_poll()(通过在 deadline_timer 中处理它)来实现。如果您认为这是可接受的,我可以在答案中编写一个示例。 - Chad
显示剩余4条评论
5个回答

16

在阅读这里这里的文档之后,特别是以下段落:

ZMQ_FD: 检索与套接字相关联的文件描述符。ZMQ_FD选项将检索与指定套接字相关联的文件描述符。返回的文件描述符可用于将套接字集成到现有事件循环中; ØMQ库将通过使文件描述符准备好进行阅读来以边缘触发的方式通知套接字上的任何挂起事件。

我认为你可以对每个zmq_pollitem_t使用null_buffers并将事件循环推迟到一个io_service中,完全绕过zmq_poll()。然而,在上述文档中似乎存在一些注意事项,特别是:

从返回的文件描述符读取数据并不一定意味着消息可供读取或可以写入基础套接字;应用程序必须使用后续检索的ZMQ_EVENTS选项来检索实际的事件状态。

因此,当您的zmq套接字之一的处理程序被触发时,我认为您需要在处理事件之前多做一些工作。以下是未编译的伪代码:

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

请注意,您不必在这里使用lambda表达式,boost::bind到成员函数就足够了。


那个方法听起来非常有前途。我从使用ipc连接的ZMQ中获取了一个文件描述符。但是,async_read_some(null_buffers(), ...)给了我一个“坏文件描述符”的错误... - Chris
@Chris,在async_read_some之前,你需要将zmq文件描述符分配给asio stream_descriptor。等我不用iPhone时,我会更新我的答案并提供一个示例。 - Sam Miller

2

这个问题发布两年后,有人发布了一个可以完美解决此问题的项目。该项目在这里:https://github.com/zeromq/azmq。讨论其设计的博客文章在这里:https://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/

以下是从自述文件中复制的示例代码:

#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>

namespace asio = boost::asio;

int main(int argc, char** argv) {
    asio::io_service ios;
    azmq::sub_socket subscriber(ios);
    subscriber.connect("tcp://192.168.55.112:5556");
    subscriber.connect("tcp://192.168.55.201:7721");
    subscriber.set_option(azmq::socket::subscribe("NASDAQ"));

    azmq::pub_socket publisher(ios);
    publisher.bind("ipc://nasdaq-feed");

    std::array<char, 256> buf;
    for (;;) {
        auto size = subscriber.receive(asio::buffer(buf));
        publisher.send(asio::buffer(buf));
    }
    return 0;
}

看起来不错。如果你尝试了,请在评论中告诉我它在2019年是否仍然有效(我可能会在几个月后尝试,然后更新这个答案)(该存储库已经过时,最后一次提交是一年前)


2
最终我找到了两种可能的解决方案:
  • Sam Miller的方法,我们使用ASIO的事件循环
  • 通过获取acceptorsocket.native()方法中的ASIO文件描述符,并将它们插入zmq_pollitem_t数组中,使用ZeroMQ的事件循环
在SCGI情况下,不断创建和结束连接。因此,我接受了Sam Miller的答案,因为这对我来说是最好的解决方案。处理不断变化的zmq_pollitem_t数组是一个大麻烦,可以通过使用ASIO事件循环来避免。

2
获取 ZeroMQ 的套接字只是其中最小的一部分。ZeroMQ 基于一个协议,该协议在 TCP 上进行了分层,因此如果您选择这条路线,则必须在自定义 Boost.Asio io_service 中重新实现 ZeroMQ。当我尝试使用 Boost.Asio 创建异步ENet服务时,我遇到了同样的问题,首先尝试使用 Boost.Asio UDP 服务捕获来自 ENet 客户端的流量。ENet 是一个类似于 TCP 的协议,它在 UDP 上进行了分层,因此在那个时候,我所实现的只是在一个几乎无用的状态下捕获数据包。
Boost.Asio 是基于模板的,内置的 io_service 使用模板来包装系统套接字库,以创建 TCP 和 UDP 服务。我的最终解决方案是创建一个自定义 io_service,它包装了 ENet 库而不是系统套接字库,从而允许它使用 ENet 的传输函数,而不必使用内置的 UDP 传输重新实现它们。
同样的方法也可以用于ZeroMQ,但是ZeroMQ本身已经是一个非常高性能的网络库,已经提供了异步I/O。我认为你可以通过使用ZeroMQ的现有API接收消息,并将这些消息传递到io_service线程池中来创建一个可行的解决方案。这样,消息/任务仍将使用Boost.Asio的反应器模式异步处理,而无需重写任何内容。ZeroMQ将提供异步I/O,而Boost.Asio将提供异步任务处理程序/工作者。
现有的io_service也可以与现有的TCP套接字耦合在一起,从而使线程池可以处理TCP(在您的情况下为HTTP)和ZeroMQ。在这种设置中,ZeroMQ任务处理程序完全可以访问TCP服务会话对象,从而允许您将ZeroMQ消息/任务的结果发送回TCP客户端。
以下内容仅用于说明概念。
// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}

0

解决方案是除了run()之外还要轮询io_service。

请查看此解决方案以获取一些poll()信息。

使用poll而不是run将允许您轮询zmq的连接而不会有任何阻塞问题。


1
但这会不会导致问题,我被困在等待ZMQ消息的过程中,直到我可以查找ASIO/SCGI消息,因为ZMQ是阻塞的,但ASIO不是?或者对于两个都使用非阻塞,我要自己进行大量轮询,以查看它们是否有工作要做? - Chris
你已经在使用zmq_poll()了。我假设在我的回答中这仍然是正确的。所以对于你的第二个(评论)问题,答案是肯定的,你将同时轮询asio和zmq。我并不是想暗示你会改变zmq方法从zmq_poll()到zsock.recv(),从而使zmq阻塞... - g19fanatic
1
经典轮询(即重复主动询问是否有工作要做)不是一个选项。因此,zmq_poll() 的名称是具有误导性的,因为它不是“经典轮询”。它的名称源于 poll(),就像在 http://linux.die.net/man/2/poll 中所述的那样 - 这不是经典轮询。-- 到目前为止,我只能看到让 ZMQ 和 ASIO 立即返回的解决方案,这会导致经典轮询结果旋转(不好),或者使其中一个或两个阻塞,从而导致处理 ZMQ 和/或 ASIO 消息的巨大延迟。或者我没有正确理解您的评论 :( - Chris
这是你唯一的两个选择。要么是经典轮询,要么是带阻塞的轮询。你可以在经典轮询循环中添加一个睡眠来避免它只是旋转和浪费 CPU 时间。CPU 会在睡眠时进行上下文切换,但你可能无法保证睡眠的延迟时间... - g19fanatic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接