boost::asio::socket线程安全性

25

这是我原问题的简化版本。

我有几个线程,它们写入一个boost asio套接字。这似乎非常好地工作,没有问题。

文档说,共享套接字不是线程安全的(在这里,在底部),所以我想知道是否应该使用互斥锁或其他方式保护套接字。

这个问题坚持认为需要保护,但并没有提供如何做到这一点的建议。

所有对我的原始问题的答案都坚称我正在做危险的事情,大多数人敦促我用async_writes或者更复杂的方法替换我的写入操作。然而,我不愿意这样做,因为这会使已经正常工作的代码变得更加复杂,回答者中没有人说服我他们知道自己在说什么——他们似乎读过与我相同的文档,并且只是猜测,就像我一样。

因此,我编写了一个简单的程序来压力测试从两个线程向共享套接字写入。

下面是服务器代码,它只是将从客户端接收到的内容写出。

int main()
{
    boost::asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));

    tcp::socket socket(io_service);
    acceptor.accept(socket);

    for (;;)
    {
        char mybuffer[1256];
        int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
        mybuffer[len] = '\0';
        std::cout << mybuffer;
        std::cout.flush();

    }

  return 0;
}

这里是客户端,它创建了两个线程,尽可能快地向共享套接字写入数据。

boost::asio::ip::tcp::socket * psocket;

void speaker1()
{
    string msg("speaker1: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}
void speaker2()
{
    string msg("speaker2: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}

int main(int argc, char* argv[])
{

    boost::asio::io_service io_service;

  // connect to server

    tcp::resolver resolver(io_service);
    tcp::resolver::query query("localhost", "3001");
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;
    psocket = new tcp::socket(io_service);
    boost::system::error_code error = boost::asio::error::host_not_found;
    while (error && endpoint_iterator != end)
    {
        psocket->close();
        psocket->connect(*endpoint_iterator++, error);
    }


    boost::thread t1( speaker1 );
    boost::thread t2( speaker2 );

    Sleep(50000);

}

这个方法可行!就我所知,它完美地运行着。客户端不会崩溃,消息也不会混乱。它们通常会交替到达服务器,每个线程一个消息。有时一个线程在另一个之前会收到两个或三个消息,但我认为只要没有混淆并且所有消息都到达了,这并不是问题。

我的结论是:在某种理论意义上,socket可能不是线程安全的,但很难使它失败,所以我不会担心它。


1
使用io_service :: post()调用write()处理程序几乎没有价值。您正在过度复杂化,应该使用async_write()。 - Sam Miller
1
你的实现在 ASIO 方面没有意义。那种代码风格中没有必要使用 ASIO。 - Etherealone
为什么您认为需要多个线程? - autistic
9个回答

13

重新研究了async_write的代码后,我现在确信只有在数据包大小小于时,任何写操作才是线程安全的。

default_max_transfer_size = 65536;

当调用async_write时,会立即在同一线程中调用async_write_some。池中的任何线程调用io_service::run的任何形式将持续调用async_write_some以完成该写入操作。

如果需要多次调用async_write_some(数据包大于65536),这些调用可以彼此交错。

ASIO不会像您期望的那样将写入排队到套接字中,一个写入操作接着一个完成。为了确保同时考虑线程和互锁安全的写入操作,请考虑以下代码片段:

    void my_connection::async_serialized_write(
            boost::shared_ptr<transmission> outpacket) {
        m_tx_mutex.lock();
        bool in_progress = !m_pending_transmissions.empty();
        m_pending_transmissions.push(outpacket);
        if (!in_progress) {
            if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
                boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                        boost::asio::transfer_all(),
            boost::bind(&my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
            } else { // Send single buffer
                boost::asio::async_write(m_socket,
                                    boost::asio::buffer(
                                           m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
                boost::asio::transfer_all(),
                boost::bind(
                        &my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
            }
        }
        m_tx_mutex.unlock();
    }

    void my_connection::handle_async_serialized_write(
    const boost::system::error_code& e, size_t bytes_transferred) {
        if (!e) {
            boost::shared_ptr<transmission> transmission;
            m_tx_mutex.lock();
            transmission = m_pending_transmissions.front();
            m_pending_transmissions.pop();
            if (!m_pending_transmissions.empty()) {
                if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
            boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                    boost::asio::transfer_exactly(
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::bind(
                            &chreosis_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                } else { // Send single buffer
                    boost::asio::async_write(m_socket,
                    boost::asio::buffer(
                            m_pending_transmissions.front()->buffer_references.front(),
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::asio::transfer_all(),
                    boost::bind(
                            &my_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                }
            }
            m_tx_mutex.unlock();
            transmission->handler(e, bytes_transferred, transmission);
        } else {
            MYLOG_ERROR(
            m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
            stop(connection_stop_reasons::stop_async_handler_error);
        }
    }

这基本上创建了一个队列,每次只发送一个数据包。仅当第一次写入成功后才调用async_write,然后调用第一次写入的原始处理程序。

如果asio自动为每个套接字/流创建写入队列,则会更容易。


有趣!因为我从来没有想过发送这么大的数据包,所以看起来我应该没问题。 - ravenspoint
当套接字的内部缓冲区已满时,也可能会发生此情况。 - Climax
我一直怀疑线程与此相关。 - ravenspoint
交错观察很重要。人们也需要认识到,一个串行调用(非并发调用)无法防止异步方法(async_read 和 async_write)对套接字的并发访问。asio 设计要求在读取和写入处理程序中调用读取和写入操作。这导致了(1)异步方法的非并发调用,(2)它们启动的异步代码的非并发执行,以及(3)避免交错。不幸的是,这并不能满足真正并发的需求。 - evoskuil
如果我不直接从工作线程调用async_write(some_huge_data_packet),而是将写操作“post”到我的单个io_service :: run线程,那么我是否可以保证我的大数据包不会交错? - grizzlybears
@grizzlybears 不是我个人的意见和经验。 ASIO 接口在最近几年发生了很大变化。也许应该重新评估这一点。 - Climax

9

使用boost::asio::io_service::strand来处理不安全的异步处理程序。

串行执行事件处理程序的操作称为“strand”(即没有并发调用)。“strand”的使用允许在多线程程序中执行代码而无需显式锁定(例如使用互斥锁)。

计时器教程可能是理解“strand”的最简单方式。


1
@ravenspoint,您正在将异步方法与同步方法混合使用。请不要这样做。线程只在异步上下文中有用。 - Sam Miller
@evoskuil 实际上,它会这样做。有一堆的boost魔法可以实现这一点。(在你依赖它之前,了解它的工作原理非常重要,在这里没有足够的空间来解释。但是你可以在你最喜欢的搜索引擎中输入“asio_handler_invoke”或阅读这篇文章。) - David Schwartz
@DavidSchwartz,没有任何增强魔法可以使从在串上调用的处理程序创建的线程在串上执行。 - evoskuil
@evoskuil 这正是 asio_handler_invoke 所做的,它内置于 strand 类中。本质上,“处理程序”也包括一个“调用器”,用于调用处理程序以及任何先前组合操作的中介。 “调用器”在 strand 上进行调度。这就是 strand 在异步操作中的工作方式。 - David Schwartz
@DavidSchwartz 你正在做关于线程创建的假设。 - evoskuil
显示剩余9条评论

9
这个问题的关键在于:当两个不同的线程同时在单个套接字上调用async_write_some()时会发生什么。
我认为这正是不安全的操作。这些缓冲区发送到网络上的顺序是未定义的,它们甚至可能交错。特别是如果使用方便的函数async_write(),因为它在底层实现为一系列调用async_write_some(),直到整个缓冲区被发送。在这种情况下,来自两个线程的每个片段都可能随机交错发送。
唯一保护你免受此类情况的方法是构建程序以避免这种情况。
一种方法是编写应用程序层发送缓冲区,由单个线程负责将其推送到套接字上。这样,您只能保护发送缓冲区本身。但请记住,简单的std::vector不起作用,因为将字节添加到末尾可能会重新分配它,而此时可能存在未完成的async_write_some()引用它。相反,使用缓冲区的链表,利用asio的散点/聚集功能可能是一个好主意。

我没有使用aysnc_write_some或async_write。我正在使用write(http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/write.html)函数。 - ravenspoint

6
理解ASIO的关键是意识到完成处理程序仅在调用了 io_service.run() 的线程上下文中运行,无论哪个线程调用了异步方法。 如果您只在一个线程中调用了 io_service.run() ,则所有完成处理程序将按顺序在该线程的上下文中执行。 如果您在多个线程中调用了 io_service.run(),则完成处理程序将在其中一个线程的上下文中执行。 您可以将其视为线程池,其中池中的线程是已在同一 io_service 对象上调用 io_service.run() 的那些线程。
如果您有多个线程调用io_service.run(),则可以通过将它们放入strand中来强制完成处理程序进行串行化。
回答您问题的最后部分,应调用boost::async_write()。 这将将写操作分派到已调用 io_service.run()的线程上,并在写入完成后调用完成处理程序。 如果您需要对此操作进行序列化,则有点更复杂,您应阅读有关strands的文档here

我困难的一大部分是我不知道什么是“完成处理程序”。boost::asio::write()是完成处理程序吗?它会“神奇地”将写操作排队到调用io_service的run()方法的(不同)线程中,该io_service已传递给套接字构造函数吗?如果两者的答案都是肯定的,那么我的原始代码是正确的,我可以放心了! - ravenspoint
@ravenspoint:boost::asio::write()是一个同步调用,所以它会在调用它的线程上下文中立即执行。boost::asio::async_write()将函数对象作为其中之一的参数。当您调用boost::asio::async_write()时,写操作被'神奇地'排队到调用run()的线程上。 - Sean
如果你在Linux上运行,它的工作方式大致如下:当你调用io_service::run()时,底层代码会调用epoll()将当前线程阻塞,直到其中一个文件描述符上有活动发生。当你调用async_write()时,它会将套接字文件描述符添加到epoll集合中,并将完成处理程序与该文件描述符关联。当epoll返回时,底层代码会写入数据并调用处理程序。这一切发生在调用run()的线程中。 - Sean
我准备好接受 async_write() 像你描述的那样工作。问题是现在我必须防止写线程在第一个调用完成之前执行另一个 async_write() 调用。叹气。也许发送函数对象更容易?至少我已经编写并测试了代码。 - ravenspoint
1
@ravenspoint: 区别不在于一个会阻塞而另一个不会,而在于一个是同步的,另一个是异步的。为了执行异步操作,必须将操作排队。你可以执行同步操作而无需将其排队,这就是 ASIO 所做的。 - Sean
显示剩余2条评论

4

首先需要考虑的是套接字是一个流,没有内部保护来防止并发读和/或写操作。有三个不同的考虑。

  1. 访问同一套接字的函数的并发执行。
  2. 封闭同一套接字的委托的并发执行。
  3. 写入同一套接字的委托的交错执行。

聊天示例是异步的但不是并发的。io_service从单个线程中运行,使所有聊天客户端操作都是非并发的。换句话说,它避免了所有这些问题。即使是async_write在任何其他工作可以继续之前必须在内部完成发送消息的所有部分,以避免交错问题。

处理程序只由当前调用io_service的run()、run_one()、poll()或poll_one()重载的任何线程调用。

通过将工作发布到单个线程的io_service中,其他线程可以安全地避免并发和阻塞,通过在io_service中排队等待工作。但是,如果您的情况不允许您为给定套接字缓冲所有工作,则情况会变得更加复杂。您可能需要阻止套接字通信(但不是线程),而不是无限期地排队等待工作。此外,工作队列可能非常难以管理,因为它完全是不透明的。
如果您的io_service运行多个线程,仍然可以轻松避免上述问题,但是只能从其他读取或写入的处理程序(以及启动时)调用读取或写入。这样可以保持所有对套接字的访问都按顺序进行,同时保持非阻塞状态。安全性源于该模式在任何给定时间仅使用一个线程的事实。但是,从独立线程发布工作是有问题的,即使您不介意缓冲它。

Strand是一种Asio类,以确保非并发调用的方式向io_service发布工作。但是,仅使用strand来调用async_read和/或async_write只解决了三个问题中的第一个问题。这些函数在内部将工作提交到套接字的io_service。如果该服务正在运行多个线程,则可以同时执行工作。

那么,如何安全地为给定的套接字并发调用async_read和/或async_write?

  1. 对于并发调用者,第一个问题可以通过互斥锁或strand来解决,如果您不想缓冲工作,则使用前者,如果您想要缓冲工作,则使用后者。这在函数调用期间保护套接字,但对于其他问题无济于事。

  2. 第二个问题似乎最难,因为很难看到从两个函数异步执行的代码内部发生了什么。异步函数都会将工作提交到套接字的io_service。

来自boost socket源代码:

/**
 * This constructor creates a stream socket without opening it. The socket
 * needs to be opened and then connected or accepted before data can be sent
 * or received on it.
 *
 * @param io_service The io_service object that the stream socket will use to
 * dispatch handlers for any asynchronous operations performed on the socket.
 */
explicit basic_stream_socket(boost::asio::io_service& io_service)
: basic_socket<Protocol, StreamSocketService>(io_service)
{
}

并且来自 io_service::run()

/**
 * The run() function blocks until all work has finished and there are no
 * more handlers to be dispatched, or until the io_service has been stopped.
 *
 * Multiple threads may call the run() function to set up a pool of threads
 * from which the io_service may execute handlers. All threads that are
 * waiting in the pool are equivalent and the io_service may choose any one
 * of them to invoke a handler.
 *
 * ...
 */
BOOST_ASIO_DECL std::size_t run();

如果给一个套接字分配多个线程,它就必须使用多个线程——尽管不是线程安全的。避免这个问题的唯一方法(除了替换套接字实现)是只给套接字分配一个线程来工作。对于单个套接字,这正是你想要的(所以不要费心去写替代品)。
第三个问题可以通过使用(不同的)互斥锁来解决,在异步写入之前锁定该锁,传递到完成处理程序中并在那一点上解锁。这将防止任何调用者在前面的写入的所有部分都完成之前开始写入。
请注意,异步写入将工作发布到队列中——这就是它能够几乎立即返回的原因。如果你向它投入过多的工作,你可能需要处理一些后果。尽管为套接字使用单个io_service线程,但你可能会有许多线程通过并发或非并发调用async_write来发布工作。
另一方面,异步读取很简单。没有交错问题,你只需从上一个调用的处理程序回到循环。你可能希望将结果的工作分派到另一个线程或队列,但如果你在完成处理程序线程上执行它,你只是阻塞了所有读写操作在你的单线程套接字上。
更新
我对套接字流的底层实现进行了更深入的研究(针对某个平台)。看起来情况是这样的:套接字始终在调用线程上执行平台套接字调用,而不是委派给io_service的代理。换句话说,尽管async_read和async_write似乎立即返回,但它们实际上在返回之前执行所有套接字操作。只有处理程序被发布到io_service。尽管我查阅的示例代码没有记录或公开此信息,但假设它是保证的行为,则显着影响上述第二个问题。
假设发布到io_service的工作不包含套接字操作,则无需将io_service限制为单个线程。但这也强调了防止异步函数并发执行的重要性。例如,如果按照聊天示例添加另一个线程到io_service中,则会出现问题。由于异步函数调用在函数处理程序内部执行,因此存在并发函数执行的情况。这将需要使用互斥锁,或重新发布所有异步函数调用以在strand上执行。 更新2 关于第三个问题(交错),如果数据大小超过65536字节,则工作在async_write内部被分解并分段发送。但是需要理解的是,如果io_service中有多个线程,则除第一个工作块外的其他工作块将被发布到不同的线程。这一切都发生在调用完成处理程序之前,即在async_write函数内部。实现会创建自己的中间完成处理程序,并使用它们来执行除第一个套接字操作以外的所有操作。

这意味着,如果有多个io_service线程并且要发布超过64kb的数据(默认情况下可能有所不同),则对async_write调用的任何保护(互斥锁或strand)都无法保护套接字。因此,在这种情况下,交错保护不仅对于交错安全性而言是必需的,而且对于套接字的线程安全性也是必需的。我在调试器中验证了所有这些。

互斥锁选项

async_read和async_write函数在内部使用io_service获取线程来发布完成处理程序,会阻塞直到线程可用。这使得它们难以使用互斥锁进行保护。当互斥锁用于保护这些函数时,当线程反向锁定时会发生死锁,导致io_service饥饿。考虑到没有其他方法可以在使用多线程io_service发送> 64k时保护async_write,因此在该场景下它有效地将我们锁定到单个线程中 - 当然解决了并发性问题。

2
根据2008年11月的boost 1.37 asio更新,包括写入在内的某些同步操作“现在是线程安全的”,允许“在单个套接字上进行并发同步操作(如果操作系统支持)”boost 1.37.0历史记录。这似乎支持您所看到的情况,但是在boost文档中仍然存在过于简化的“共享对象:不安全”条款适用于ip::tcp::socket。

感谢您的查看。很遗憾没有人来给出明确的答案,只有其他读者的文档。顺便说一下,您的XML编辑器foxe非常棒! - ravenspoint

1

一条关于旧帖子的评论...

我认为asio文档中asio::async_write()重载的关键句子是以下内容:

此操作是通过对流的async_write_some函数零个或多个调用来实现的,被称为组合操作。程序必须确保在此操作完成之前,流不执行任何其他写操作(例如async_write、流的async_write_some函数或执行写操作的任何其他组合操作)。

据我所知,这说明了许多以上答案中的假设:如果多个线程执行io_context.run(),则从对asio::async_write的调用中获取的数据可能会交错

也许这可以帮助某些人;-)


0

这取决于您是否从多个线程访问同一套接字对象。假设您有两个线程运行相同的io_service::run()函数。

例如,如果您同时进行读写或者可能从其他线程执行取消操作,则不安全。

但是,如果您的协议一次只执行一个操作。

  1. 如果只有一个线程运行io_service run,则没有问题。如果您想从其他线程上执行某些操作,则可以调用带有处理程序的io_service::post(),该处理程序在套接字上执行此操作,因此它将在同一线程中执行。
  2. 如果您有多个线程执行io_service::run并且尝试同时执行操作-例如取消和读取操作,则应使用strands。在Boost.Asio文档中有一个教程。

套接字从多个线程访问。并非所有线程都使用io_service。 - ravenspoint

-2

我一直在进行广泛的测试,但无法破坏asio。即使没有锁定任何互斥量。

尽管如此,我建议您在每个调用周围使用async_readasync_write互斥量。

我相信唯一的缺点是,如果有多个线程调用io_service::run,则可能会同时调用完成处理程序。

在我的情况下,这不是问题。以下是我的测试代码:

#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/asio.hpp>
#include <vector>

using namespace std;
char databuffer[256];
vector<boost::asio::const_buffer> scatter_buffer;
boost::mutex my_test_mutex;
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_service *io) {
while(1) {
    boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));

    //my_test_mutex.lock(); // It would be safer 
    socket->async_send(scatter_buffer, boost::bind(&mycallback));
    //my_test_mutex.unlock(); // It would be safer
}
}
int main(int argc, char **argv) {

for(int i = 0; i < 256; ++i)
    databuffer[i] = i;

for(int i = 0; i < 4*90; ++i)
    scatter_buffer.push_back(boost::asio::buffer(databuffer));
boost::asio::io_service my_test_ioservice;
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice);
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice);
boost::asio::ip::tcp::resolver::query  my_test_tcp_query("192.168.1.10", "40000");
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query);
boost::asio::connect(my_test_socket, my_test_tcp_iterator);
for (size_t i = 0; i < 8; ++i) {
    boost::shared_ptr<boost::thread> thread(
            new boost::thread(my_test_func, &my_test_socket, &my_test_ioservice));
}

while(1) {
    my_test_ioservice.run_one();
    boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
}
return 0;

}

这是我用Python搭建的临时服务器:

import socket
def main():
    mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    mysocket.bind((socket.gethostname(), 40000))
    mysocket.listen(1)

    while 1:
        (clientsocket, address) = mysocket.accept()
        print("Connection from: " + str(address))
        i = 0
        count = 0
        while i == ord(clientsocket.recv(1)):
            i += 1
            i %= 256

            count+=1
            if count % 1000 == 0:
                print(count/1000)
        print("Error!")
return 0

if __name__ == '__main__':
    main()

请注意运行此代码可能会导致计算机出现过度交换的情况。

这段代码并没有正确地进行测试。它应该发送比65536大得多的数据包,以说明asio在交错async_write调用时不具备固有的线程安全性。 - Climax
1
互斥锁不保护从临界区异步调用的代码。 - evoskuil
但我同意在异步读取和异步写入的并发执行周围使用互斥锁是一个好主意,因为套接字不是线程安全的,否则会被同时访问。 - evoskuil

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接