Boost beast::websocket回调函数

6

我正在尝试使用Boost beast::websocket websocket_client_async.cpp 示例,结合 websocket_server_async.cpp 使用。

按照示例,客户端 只是建立连接,向服务器发送一个字符串(服务器简单地回显),打印回复,关闭并退出。

我正在尝试修改客户端以保持会话活动,以便可以重复发送/接收字符串。因此,示例代码的 on_handshake 函数立即通过 ws_.async_write(...) 发送字符串,而我将其分离成自己的 write(...) 函数。

这是我修改后的 session 类:

using tcp = boost::asio::ip::tcp;
namespace websocket = boost::beast::websocket;

void fail(boost::system::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    tcp::resolver resolver_;
    websocket::stream<tcp::socket> ws_;
    std::atomic<bool> io_in_progress_;
    boost::beast::multi_buffer buffer_;
    std::string host_;

public:
    // Resolver and socket require an io_context
    explicit session(boost::asio::io_context& ioc) : resolver_(ioc), ws_(ioc) {
        io_in_progress_ = false;
    }

    bool io_in_progress() const {
        return io_in_progress_;
    }

    // +---------------------+
    // | The "open" sequence |
    // +---------------------+
    void open(char const* host, char const* port)
    {
        host_ = host;

        // Look up the domain name
        resolver_.async_resolve(host, port,
            std::bind( &session::on_resolve, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results)
    {
        if (ec)
            return fail(ec, "resolve");

        boost::asio::async_connect(
            ws_.next_layer(), results.begin(), results.end(),
            std::bind( &session::on_connect, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_connect(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "connect");

        ws_.async_handshake(host_, "/",
            std::bind( &session::on_handshake, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_handshake(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "handshake");
        else {
            std::cout << "Successful handshake with server.\n";
        }
    }

    // +---------------------------+
    // | The "write/read" sequence |
    // +---------------------------+
    void write(const std::string &text)
    {
        io_in_progress_ = true;
        ws_.async_write(boost::asio::buffer(text),
            std::bind( &session::on_write, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "write");

        ws_.async_read(buffer_,
            std::bind( &session::on_read, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        io_in_progress_ = false; // end of write/read sequence
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "read");

        std::cout << boost::beast::buffers(buffer_.data()) << std::endl;
    }

    // +----------------------+
    // | The "close" sequence |
    // +----------------------+
    void close()
    {
        io_in_progress_ = true;
        ws_.async_close(websocket::close_code::normal,
            std::bind( &session::on_close, shared_from_this(),
                std::placeholders::_1)
        );
    }

    void on_close(boost::system::error_code ec)
    {
        io_in_progress_ = false; // end of close sequence
        if (ec)
            return fail(ec, "close");

        std::cout << "Socket closed successfully.\n";
    }
};

问题在于,虽然连接正常工作并且我可以发送字符串,但是on_read回调函数从未被触发(除非我执行下面描述的丑陋的hack)。
我的main代码如下:
void wait_for_io(std::shared_ptr<session> psession, boost::asio::io_context &ioc)
{
    // Continually try to run the ioc until the callbacks are finally
    // triggered (as indicated by the session::io_in_progress_ flag)
    while (psession->io_in_progress()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        ioc.run();
    }
}

int main(int argc, char** argv)
{
    // Check command line arguments.
    if (argc != 3) {
        std::cerr << "usage info goes here...\n";
        return EXIT_FAILURE;
    }
    const char *host = argv[1], *port = argv[2];

    boost::asio::io_context ioc;
    std::shared_ptr<session> p = std::make_shared<session>(ioc);
    p->open(host, port);
    ioc.run(); // This works. Connection is established and all callbacks are executed.

    p->write("Hello world"); // String is sent & received by server,
                             // even before calling ioc.run()
                             // However, session::on_read callback is never
                             // reached.

    ioc.run();               // This seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- so this hack is necessary

    p->close();              // session::on_close is never reached
    ioc.run();               // Again, this seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- this is necessary

    return EXIT_SUCCESS;
}

如果我这样做:
p->write("Hello world");
while(1) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

我可以确认该字符串已被服务器发送和接收1,但未到达session::on_read回调函数。
使用p->close()同样出现此问题。
但是,如果我添加奇怪的wait_for_io()函数,一切都正常。我确定这是一个可怕的hack,但我无法弄清楚发生了什么。
1注意:我可以确认消息已经到达服务器,因为我修改了服务器示例以将任何接收到的字符串打印到控制台。这是我所做的唯一修改。echo-to-client功能没有更改。

这只是一个观察,这段代码存在未定义的行为。在write中调用了boost::asio::buffer(text)。但是,在异步操作仍在运行时,text可能会超出作用域。调用者有责任确保传递给流算法的内存缓冲区的生命周期至少保持有效,直到相应的完成处理程序被调用。 - Vinnie Falco
@VinnieFalco 噢...好的点子。那只是我在为SO.SE格式化代码时犯的一个愚蠢的转录错误。 - Blair Fonville
2个回答

4
原因是在第一次调用(此处显示)后,对io_context::run()的调用不起作用。
boost::asio::io_context ioc;
std::shared_ptr<session> p = std::make_shared<session>(ioc);
p->open(host, port);
ioc.run(); // This works. Connection is established and all callbacks are executed.

这是因为在调用任何后续的io_context::run之前,必须先调用函数io_context::restart()
从文档中可以看到:(来源) io_context::restart

重新启动io_context以准备进行后续的run()调用。

在先前调用这些函数时,如果由于io_context停止或工作耗尽而导致这些函数返回,则在第二次或更晚的调用run()、run_one()、poll()或poll_one()函数之前必须调用此函数。调用restart()函数后,io_context对象的stopped()函数将返回false。

1
每次看来我都会忘记需要调用“restart”。 - Vinnie Falco

3

io_context::run 只有当没有未完成的任务时才会返回。如果您确保始终有待处理的 websocket::stream::async_read 调用,那么 run 就不会返回,也不需要使用任何技巧。此外,您将接收到服务器发送的所有消息。


谢谢Vinnie。我确实理解了这一点,并且一开始就以这种方式进行了测试,但是当我想关闭套接字时,结束挂起的调用时遇到了一些困难。但是,老实说,我不明白我上面的代码如何不符合您在第一句话中所写的内容。当我在ioc.run()之前调用p->write(...);时,确实存在未决工作。然后应该进行读取。 - Blair Fonville
澄清一下,当我调用p->write(...)时,函数本身会创建工作 - 这就是为什么我不明白ioc.run()为什么无法执行任何操作。 - Blair Fonville
如果您想关闭WebSocket连接,只需调用async_close。确保您从与其他调用相同的隐式或显式线程中执行此操作。当async_close操作完成时,待处理的async_readasync_write调用将最终以websocket::error::closedboost::asio::operation_aborted完成。我不确定为什么io_context::run立即返回,但这只能是因为没有待处理的工作。 - Vinnie Falco
是的,调用async_close确实有效 - 我不确定我之前遇到的问题是什么;这很容易做到。也许我忘记加入线程或者犯了一些愚蠢的错误。但这有点离题了。我的问题实际上是关于为什么上面的代码不起作用。现在,我只是在测试我对库的理解,以确保我能够牢固掌握写操作代码的技巧。在继续之前,我想搞清楚这个谜团。我开始认为这可能是库中的一个bug。 - Blair Fonville
如果最新版本的Beast存在缺陷,我会非常惊讶。该代码通过测试几乎达到了100%的覆盖率,并且测试非常详尽。示例客户端和服务器按照编写方式工作。如果它们停止工作,那肯定是因为进行了任何更改。我会重新使用原始代码并验证其是否按预期工作,然后逐步应用您的更改以找出导致其发生故障的原因。 - Vinnie Falco
谢谢Vinnie。我一直在使用1.66版本,但是看起来1.67版本修复了一些错误。不过,这并不是一个错误。我已经解决了。再次感谢你的帮助。 - Blair Fonville

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接