如何避免使用`asio::ip::tcp::iostream`时出现数据竞争?

7

我的问题

在使用两个线程发送和接收asio::ip::tcp::iostream时,如何避免数据竞争?

设计

我正在编写一个程序,使用asio::ip::tcp::iostream进行输入和输出。该程序通过5555端口从(远程)用户接收命令,并通过同一TCP连接向用户发送消息。由于这些事件(从用户接收到的命令或发送给用户的消息)是异步发生的,因此我有单独的传输和接收线程。

在这个玩具版本中,命令是“one”,“two”和“quit”。当然,“quit”退出程序。其他命令不起作用,任何无法识别的命令都会导致服务器关闭TCP连接。

发送的消息是简单的序列号消息,每秒发送一次。

在这个玩具版本和我尝试编写的真实代码中,传输和接收过程都使用阻塞IO,因此似乎没有好的方法可以使用std::mutex或其他同步机制。(在我的尝试中,一个进程会抓住互斥锁然后阻塞,这对此不起作用。)

构建和测试

为了构建和测试这个程序,我在64位Linux机器上使用gcc版本7.2.1和valgrind 3.13。构建:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

为了测试,我使用以下命令运行服务器:

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

接着我在另一个窗口中使用telnet 127.0.0.1 5555来创建与服务器的连接。 helgrind正确指出的问题是,由于runTxrunRx都在异步访问同一流,所以存在数据竞争:

==16188== 在线程#1读取大小为1的0x1FFEFFF1CC时可能存在数据竞争

==16188== 持有的锁:无

...更多省略的行

concurrent.cpp

#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream *in, std::ostream *out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream *in);
    int runRx(std::ostream *out);
    bool want_quit;
    bool want_reset;
};

int Console::runTx(std::istream *in) {
    static const std::array<std::string, 3> cmds{
        "quit", "one", "two", 
    };
    std::string command;
    while (!want_quit && !want_reset && *in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream *out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        (*out) << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        out->flush();
    }
    return 0;
}

int Console::run(std::istream *in, std::ostream *out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, out};
    int status = runTx(in);
    t1.join();
    return status;
}

int main()
{
    Console con;
    asio::io_service ios;
    // IPv4 address, port 5555
    asio::ip::tcp::acceptor acceptor(ios, 
            asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
    while (!con.getQuitValue()) {
        asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());
        con.run(&stream, &stream);
        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

这将是一个生产者-消费者模式。有几种不同的解决方案可用,其中一些没有显式使用信号量或类似物。 - C. Gonzalez
1个回答

2

是的,你正在共享底层流的套接字,而没有同步。

Sidenote, same with the boolean flags, which can easily be "fixed" by changing:

std::atomic_bool want_quit;
std::atomic_bool want_reset;

如何解决

说实话,我不认为有一个好的解决方案。你自己说了:操作是异步的,如果你试图同步执行它们,那么你会遇到麻烦。

你可以尝试想出一些方法。如果我们基于相同的底层套接字(文件描述符)创建一个单独的流对象,那该怎么办呢?由于这样的流不是 Asio 的一部分,所以这并不容易。

但我们可以使用 Boost Iostreams 来进行一些修改:

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

// .... later:

    // HACK: procure a _separate `ostream` to prevent the race, using the same fd
    namespace bio = boost::iostreams;
    bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
    bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

    con.run(stream, hack_ostream);

事实上,这可以在没有竞争的情况下运行(同时读写同一Socket 是可以的,只要你不共享非线程安全Asio对象来包装它们)。

我建议您这样做:

不要这样做。这是一个临时的解决方案。显然,您正在复杂化事情,试图避免使用异步代码。我会咬紧牙关。

将IO机制从服务逻辑中提取出来并不需要太多工作。 最终,您将不受任何随机限制(您可以考虑处理多个客户端,甚至可以完全不使用任何线程等)。

如果您想了解一些中间地带,请查看Stackful协程(http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)。

列表

仅供参考

请注意,我已经重构以消除指针的需要。你不会转移所有权,所以引用就行了。如果您不知道如何将引用传递给bind / std :: thread 构造函数,则可以看到std :: ref 的诡计。

(为了进行压力测试,我大大降低了延迟。)

在Coliru上实时查看

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream &in, std::ostream &out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream &in);
    int runRx(std::ostream &out);
    std::atomic_bool want_quit;
    std::atomic_bool want_reset;
};

int Console::runTx(std::istream &in) {
    static const std::array<std::string, 3> cmds{
        {"quit", "one", "two"}, 
    };
    std::string command;
    while (!want_quit && !want_reset && in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream &out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        out << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        out.flush();
    }
    return 0;
}

int Console::run(std::istream &in, std::ostream &out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, std::ref(out)};
    int status = runTx(in);
    t1.join();
    return status;
}

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

int main()
{
    Console con;
    boost::asio::io_service ios;

    // IPv4 address, port 5555
    boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});

    while (!con.getQuitValue()) {
        boost::asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());

        {
            // HACK: procure a _separate `ostream` to prevent the race, using the same fd
            namespace bio = boost::iostreams;
            bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
            bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

            con.run(stream, hack_ostream);
        }

        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

测试:

netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2

并且
commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)

该程序无限运行,偶尔会重置连接(当发送命令“three”时)。


“将IO机制从服务逻辑中分离出来并不需要太多工作量。”我不理解您的意思,能否详细说明一下? - Edward
当然。在您的代码中,不要将请求循环与istream绑定,而是使其从“任何地方”接收请求消息(插入您可以想到的最简单的接口[https://paste.ubuntu.com/26336487/]),然后您就可以自由地用异步实现替换IO实现,而不必担心复杂性增加。它被隐藏和分隔。 - sehe
好的,将iO与处理分离是有道理的,但我不知道你在这个上下文中所说的“异步代码”是什么意思。这是我尝试编写异步代码的方式 - 毫无疑问,有更好的方法来实现它,但是怎么做呢? - Edward
@Edward,你深刻地意识到你的代码没有并发性,但也不是异步的。这表现在IO操作会阻塞的事实上。使用async_*版本的调用可以获得异步IO,但这也是Asio的噩梦。[这也意味着你将无法使用iostream抽象,尽管你仍然可以使用std::[io]streamboost::asio::streambuf] - sehe
好的,我想我会回到asio文档上再试着理解一下。 显然,我仍然缺少某些基本知识,但不知道是什么。 无论如何,我感谢您花费的时间。 - Edward

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接