Asio点对点网络编程

3

我正在查找Asio套接字文档,但是我找不到有关如何处理以下情况的任何有用信息:

我假设在对等网络中有很多服务器(最多1000个)。服务器将定期彼此通信,因此我不想每次需要时都打开新的客户端连接以发送消息到另一个服务器(巨大的开销)。

同时,创建n个线程,每个线程对应一个客户端->服务器连接也不是真正可行的。

我将实现不同的通信方案(全互连、星型和树形),因此1、log(n)和n个服务器将必须实例化这些n个套接字客户端以创建与其他服务器的连接。

有什么好方法可以简单地执行以下伪代码吗。

pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);

我知道在服务器端可以使用异步连接。然而,我不知道如何从C++/Asio的“客户端”(发送方)角度处理它。

Tl:DR;

当我想要向N个服务器“发送”消息时,每次都不必打开N个连接,也不使用N个线程时,应该使用哪些API和类?


1
我不明白你的问题,也就是说你把很多东西混在一起了。使用异步反应器模式或n个线程是并发的,与点对点直接无关。当然,你的伪代码可能有效,因为sendMessage只需“对于每个连接发送消息”。此外,boost asio不关心它是否是p2p,你只需在程序中拥有服务器和客户端逻辑即可。 - Superlokkus
是的,每个进程都需要一个服务器端(用于接收来自任何n个参与者的消息)和一个客户端(用于向任何n个参与者发送消息)。 然而,就我所了解的Asio而言,向n个参与者中的k个发送消息的唯一方法是创建k个线程和k个连接。 - raycons
如果您坚持使用ASIO的非显式同步标记API,那么不需要考虑这个问题。您甚至可以只使用1个线程。ASIO是异步输入输出的缩写,其主要目的是为IO提供异步或反应堆模式。这意味着您不必使用k或n个线程,因为它没有阻塞API调用。使用多个线程有助于避免过多的工作排队,但这取决于CPU/系统性能。 - Superlokkus
这正是帖子中的问题。当我想要“发送”消息到N个服务器时,我应该使用哪些API和类,而不必每次都打开N个连接,也不使用N个线程。 - raycons
我们是在谈论UDP还是TCP? - Superlokkus
我想使用TCP。 - raycons
2个回答

4

是的,每个进程都需要一个服务器端(用于接收来自任何n个参与者的消息)和一个客户端(用于向任何n个参与者发送消息)。然而,在Asio中,我所能找到的唯一发送消息到k个n个参与者的方法是通过创建k个连接的k个线程。

那么你可能没有找对地方,或者根本没有很远地寻找。

异步IO的核心原则是在单个线程上多路复用IO(所有kqueue / epoll / select / IO完成端口等抽象都面向该目标)。

这是一个非常简单的演示,展示了:

  • single threaded everything
  • a listener that accepts unbounded clients (we could easily add additional listeners)
  • we connect to a collection of "peers"
  • on a heartbeat interval we send all the peers a heartbeat message

        for (auto& peer : peers)
            async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
            });
    
  • additionally it handles asynchronous process signals (INT, TERM) to shutdown all the async operations

"在Coliru上Live¹"

#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;

template <typename T>
static auto reference_eq(T const& obj) {
    return [p=&obj](auto& ref) { return &ref == p; };
}

int main() {
    using namespace boost::asio; // don't be this lazy please
    using boost::system::error_code;
    using ip::tcp;

    io_context ioc;
    tcp::acceptor listener(ioc, {{}, 6868});
    listener.set_option(tcp::acceptor::reuse_address(true));
    listener.listen();

    using Loop = std::function<void()>;

    std::list<tcp::socket> clients, peers;

    // accept unbounded clients
    Loop accept_loop = [&] {
        listener.async_accept([&](error_code const& ec, tcp::socket s) {
            if (!ec) {
                std::cout << "New session " << s.remote_endpoint() << std::endl;
                clients.push_back(std::move(s));
                accept_loop();
            }
        });
    };

    tcp::resolver resoler(ioc);
    for (auto [host,service] : {
                tuple{"www.example.com", "http"}, 
                {"localhost", "6868"}, 
                {"::1", "6868"}, 
                // ...
            })
    {
        auto& p = peers.emplace_back(ioc);
        async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
            std::cout << "For " << spec << " (" << ec.message() << ")";
            if (!ec)
                std::cout << " " << p.remote_endpoint();
            else
                peers.remove_if(reference_eq(p));
            std::cout << std::endl;
        });
    }

    std::string const& message = "heartbeat\n";
    high_resolution_timer timer(ioc);
    Loop heartbeat = [&]() mutable {
        timer.expires_from_now(2s);
        timer.async_wait([&](error_code ec) {
            std::cout << "heartbeat " << ec.message() << std::endl;
            if (ec)
                return;
            for (auto& peer : peers)
                async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                    std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
                });
            heartbeat();
        });
    };

    signal_set sigs(ioc, SIGINT, SIGTERM);
    sigs.async_wait([&](error_code ec, int sig) {
        if (!ec) {
            std::cout << "signal: " << strsignal(sig) << std::endl;
            listener.cancel();
            timer.cancel();
        } });

    accept_loop();
    heartbeat();

    ioc.run_for(10s); // max time for Coliru, or just `run()`
}

打印(在我的系统上):

New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled

请注意,“New session”这个客户端是我们本地主机上的对等连接,端口号为6868 :)
当然,在实际应用中,您可能会有一个表示客户端会话的类,也许会有待发送消息的队列,并且可以选择在多个线程上运行(使用strand来同步访问共享对象)。
其他示例
如果您真的希望避免显式集合客户端,请参阅此非常相似的演示:如何将boost asio tcp套接字传递给线程以向客户端或服务器发送心跳,它
也是从单线程开始,但添加了线程池以进行strand演示
每个会话都有自己的心跳计时器,这意味着每个会话都可以拥有自己的频率。

由于网络访问受限,¹在coliru上无法正常工作。不使用解析程序的回送版本可行:在Coliru上实时演示


好的,基本上,我们创建一个方法来接收传入的连接并将它们推入列表中,以便连接不会关闭。然后我们创建到每个对等体(每个其他服务器)的连接,并存储它们,以便我们可以心跳检测它们。之后,我只需要取出其中一个对等体对象并异步写入它,以处理它。我认为这正是我正在寻找的,谢谢。 我主要在官方Asio资源中寻找。您能指向其他好的相关资源吗? - raycons
这就是我选择在这里做的事情,因为生命周期简单且单线程使得拥有集中式集合变得便宜。 - sehe
1
很遗憾地说,我有三到四本关于Asio的书籍,但没有一本是特别好的学习书籍(大多数都显示出许多反模式,甚至有些明显存在错误)。我从SO上的人们中学习,还有https://cpplang.now.sh/。 - sehe
auto& p = peers.emplace_back(ioc); 这段代码似乎对我不起作用(我使用的是C++14),在这种情况下,emplace_back返回void。 - raycons
那是旧的标准库(参见 https://en.cppreference.com/w/cpp/container/list/emplace_back,"since c++17")。你可以使用类似 l.emplace_back(...); auto& p = l.back(); 的东西。 - sehe
哇,你的回答详细又好,我别无选择只能点赞你的回答。 - Superlokkus

1

由于你表示想使用TCP即基于连接的协议,因此可以使用异步ASIO API,并且可以依赖于1个线程,因为异步即反应器模式调用不会阻塞。

你的服务器将使用boost::asio::ip::tcp::socket进行boost::asio::async_write,这与一个TCP连接发生相等。你作为参数给出的回调函数将在发送完成后调用,但async_write会立即返回。接收方类似于客户端。为了获得对传入客户端的TCP连接,你需要使用boost::asio::ip::tcp::resolver,它通过在客户端上监听boost::asio::ip::tcp::resolver::async_resolve并在服务器端初始化一个具有boost::asio::ip::tcp::endpointboost::asio::ip::tcp::acceptor::async_acceptboost::asio::ip::tcp::acceptor来为你打开新的TCP连接/套接字。实际上,你需要两个,一个用于IPv4,另一个用于IPv6。

由于在服务器端有一个TCP连接状态,您通常需要在中央位置跟踪它,但为了避免这种争用并简化模式,通常使用继承 std::enable_shared_from_this 的类,这将在回调函数中提供自身的 std::shared_pointerstd::async_write,以便在发送和接收之间,在线程没有被阻塞的情况下,不会被遗忘即删除。

对于读取,我建议使用 boost::asio::async_read_until 和一般的 boost::asio::streambuf

通过运行循环中的 boost::asio::io_context::run 的1个线程就足够了,每当许多连接需要处理接收到的内容或者需要生成新的要发送的东西时,它都会解除阻塞。

总体项目有点超出范围,如果您能缩小问题的范围,或者更好地阅读演示和示例,那将会很有帮助。我写过与您所想的类似的东西,一个弹性覆盖网络:https://github.com/Superlokkus/code


但是我是否需要创建N个boost::asio::ip::tcp::socket类型的对象并将它们放入向量中,然后在需要时调用它们? - raycons
不,这要归功于shared_from_this模式,您将为boost::asio::ip::tcp::acceptor::async_accept::async_accept提供一个回调函数,该函数创建表示连接的类的新实例。 它不必将该实例放入任何东西,因为您的连接类将通过boost::asio::async_read_until开始第一个异步接收,并且该回调对象(例如lambda)将使用[me=shared_from_this()]捕获一个std::shared_ptr实例,直到您获取到数据或关闭连接。 您将没有中央连接字典。 - Superlokkus
当然,如果这些连接操作一些全局状态,你会需要在创建时给它们一个引用,并且如果你通过 boost::asio::io_context::strandstd::atomics 或互斥锁在多个线程上运行 io_context::run,你必须在其上进行同步。 - Superlokkus
基本上,您必须将整个过程视为用户空间多线程协作调度,这与大多数操作系统中本地的异步IO API完美匹配,否则将不得不阻塞线程,而不仅仅是通过在队列上传递用户空间任务即回调调用来传递网络适配器/卡引起的中断。 - Superlokkus
1
查看我的答案以获取演示。如果您真的希望避免显式客户端集合,请参见这个非常相似的演示,它也从单线程开始,但添加了一个线程池用于展示strand。每个会话都有一个心跳计时器,这意味着每个会话可以拥有自己的频率。这可能是您想要的,也可能不是。 - sehe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接