提升ASIO并发性能

4

我正在学习如何使用Boost ASIO。以下是从与Boost ASIO文档一起提供的聊天示例中复制的一些代码:

typedef std::deque<chat_message> chat_message_queue;

class chat_client
{
    public:
        chat_client(boost::asio::io_service& io_service,
                tcp::resolver::iterator endpoint_iterator)
            : io_service_(io_service),
            socket_(io_service)
    {
        boost::asio::async_connect(socket_, endpoint_iterator,
                boost::bind(&chat_client::handle_connect, this,
                    boost::asio::placeholders::error));
    }

        void write(const chat_message& msg)
        {
            io_service_.post(boost::bind(&chat_client::do_write, this, msg));
        }

        void close()
        {
            io_service_.post(boost::bind(&chat_client::do_close, this));
        }

    private:

        void handle_connect(const boost::system::error_code& error)
        {
            //Implementation
        }

        void handle_read_header(const boost::system::error_code& error)
        {
            //Implementation
        }

        void handle_read_body(const boost::system::error_code& error)
        {
            //Implementation
        }

        void do_write(chat_message msg)
        {
            bool write_in_progress = !write_msgs_.empty();
            write_msgs_.push_back(msg);
            if (!write_in_progress)
            {
                boost::asio::async_write(socket_,
                        boost::asio::buffer(write_msgs_.front().data(),
                            write_msgs_.front().length()),
                        boost::bind(&chat_client::handle_write, this,
                            boost::asio::placeholders::error));
            }
        }



        void handle_write(const boost::system::error_code& error)
        {
            //Implementation
        }

        void do_close()
        {
            socket_.close();
        }

    private:
        boost::asio::io_service& io_service_;
        tcp::socket socket_;
        chat_message read_msg_;
        chat_message_queue write_msgs_;
};
  1. 写操作是异步的,并且在成员变量write_msgs_read_msgs_周围没有使用锁。这里不应该存在并发问题吗?

  2. 从运行io_service::run的线程中调用post是否安全?那dispatch呢?从未运行io_service::run的线程中进行相同的调用是否安全?

  3. doSend()中,为什么要将消息推入write_msgs_而不是直接发送?同时在相同的函数中,为什么要检查write_msgs_是否为空,只有当它不为空时才继续发送?write_msgs_.empty() = false是什么意思?如何实现的?

  4. 如果do_write()只在一个线程中被调用,那么为什么需要队列来维护发送序列?io_service不会完成手头的任务,然后执行由do_write调用的异步操作吗?在上面提到的示例中,使用dispatch而不是post会有什么区别吗?


嗨Sank,我也在学习asio。你成功编译了你的代码吗?当我尝试聊天示例时,编译器显示:“async_connect”不是“boost :: asio”的成员。(我正在使用asio 1.41.3)。 - Son Do Lenh
@SonDo 我能编译代码。你包含了所有必要的标头文件吗? - bisarch
嗨@sank,我解决了这个问题。我使用了较旧版本的asio(boost 1.42),其中async_connect属于不同的类(socket)。谢谢。 - Son Do Lenh
聊天示例是异步的,但不是并发的。这是典型的反应器。 - evoskuil
1个回答

8
  1. 虽然写入是异步的,但这里没有多线程: do_write() 在一个线程中被调用。当然,发送的缓冲区必须在完成处理程序被调用之前保持活动和不变。

  2. 可以从任何线程调用 post() 和 dispatch()。请阅读 io_service 文档中的“线程安全”部分

  3. 如果正在进行 async_write 操作,并且您再次在同一套接字上调用 async_write,则数据发送的顺序未定义。换句话说,数据将会混乱。最简单的解决方法是创建消息队列:每次完成 async_write 后,发出另一个 async_write。(顺便说一下,对于 async_read,也适用同样的方法。)为什么 write_msgs_.empty() = false 意味着正在进行写操作?因为只要 write_msgs_ 不为空,handle_write(上一个 async_write 的完成处理程序)就会发出另一个 async_write。当 write_msgs_ 为空时,该循环会停止。

  4. 请参阅 文档中有关 async_write 的说明

    此操作是通过对流的 async_write_some 函数进行零个或多个调用来实现的,并称为组合操作。程序必须确保在此操作完成之前,流不执行任何其他写操作(如 async_write、流的 async_write_some 函数或执行写操作的任何其他组合操作)。

至于 dispatchpost 的区别 - 就我所看到的,以上示例中它们是可以互换的。如果我们不希望被发布的函数对象同步调用,则使用 post 是必要的。


请问您能否回答我最近添加的问题的最后一部分? - bisarch
1
@sank 我编辑了我的回答。它是否回答了你问题的最后一部分? - Igor R.
@Igor R. 一个容器 "typedef std::deque<chat_message> chat_message_queue" 是线程安全的吗?如果在主线程中用户执行:write_msgs_.push_back(msg); 而另一个线程在 handler 中同时执行:write_msgs_.pop_front(); 会发生什么? - Alex
@Alex 不,从多个线程访问容器需要显式同步。但是在上面的示例中并非如此。 - Igor R.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接