是的,每个进程都需要一个服务器端(用于接收来自任何n个参与者的消息)和一个客户端(用于向任何n个参与者发送消息)。然而,在Asio中,我所能找到的唯一发送消息到k个n个参与者的方法是通过创建k个连接的k个线程。
那么你可能没有找对地方,或者根本没有很远地寻找。
异步IO的核心原则是在单个线程上多路复用IO(所有kqueue / epoll / select / IO完成端口等抽象都面向该目标)。
这是一个非常简单的演示,展示了:
- single threaded everything
- a listener that accepts unbounded clients (we could easily add additional listeners)
- we connect to a collection of "peers"
on a heartbeat interval we send all the peers a heartbeat message
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
- additionally it handles asynchronous process signals (INT, TERM) to shutdown all the async operations
"在Coliru上Live¹"
#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;
template <typename T>
static auto reference_eq(T const& obj) {
return [p=&obj](auto& ref) { return &ref == p; };
}
int main() {
using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;
io_context ioc;
tcp::acceptor listener(ioc, {{}, 6868});
listener.set_option(tcp::acceptor::reuse_address(true));
listener.listen();
using Loop = std::function<void()>;
std::list<tcp::socket> clients, peers;
Loop accept_loop = [&] {
listener.async_accept([&](error_code const& ec, tcp::socket s) {
if (!ec) {
std::cout << "New session " << s.remote_endpoint() << std::endl;
clients.push_back(std::move(s));
accept_loop();
}
});
};
tcp::resolver resoler(ioc);
for (auto [host,service] : {
tuple{"www.example.com", "http"},
{"localhost", "6868"},
{"::1", "6868"},
})
{
auto& p = peers.emplace_back(ioc);
async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
std::cout << "For " << spec << " (" << ec.message() << ")";
if (!ec)
std::cout << " " << p.remote_endpoint();
else
peers.remove_if(reference_eq(p));
std::cout << std::endl;
});
}
std::string const& message = "heartbeat\n";
high_resolution_timer timer(ioc);
Loop heartbeat = [&]() mutable {
timer.expires_from_now(2s);
timer.async_wait([&](error_code ec) {
std::cout << "heartbeat " << ec.message() << std::endl;
if (ec)
return;
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
heartbeat();
});
};
signal_set sigs(ioc, SIGINT, SIGTERM);
sigs.async_wait([&](error_code ec, int sig) {
if (!ec) {
std::cout << "signal: " << strsignal(sig) << std::endl;
listener.cancel();
timer.cancel();
} });
accept_loop();
heartbeat();
ioc.run_for(10s);
}
打印(在我的系统上):
New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled
请注意,“New session”这个客户端是我们本地主机上的对等连接,端口号为6868 :)
当然,在实际应用中,您可能会有一个表示客户端会话的类,也许会有待发送消息的队列,并且可以选择在多个线程上运行(使用strand来同步访问共享对象)。
其他示例
如果您真的希望避免显式集合客户端,请参阅此非常相似的演示:
如何将boost asio tcp套接字传递给线程以向客户端或服务器发送心跳,它
也是从单线程开始,但添加了线程池以进行strand演示
每个会话都有自己的心跳计时器,这意味着每个会话都可以拥有自己的频率。
由于网络访问受限,¹在coliru上无法正常工作。不使用解析程序的回送版本可行:在Coliru上实时演示