我的问题
在使用两个线程发送和接收asio::ip::tcp::iostream
时,如何避免数据竞争?
设计
我正在编写一个程序,使用asio::ip::tcp::iostream
进行输入和输出。该程序通过5555端口从(远程)用户接收命令,并通过同一TCP连接向用户发送消息。由于这些事件(从用户接收到的命令或发送给用户的消息)是异步发生的,因此我有单独的传输和接收线程。
在这个玩具版本中,命令是“one”,“two”和“quit”。当然,“quit”退出程序。其他命令不起作用,任何无法识别的命令都会导致服务器关闭TCP连接。
发送的消息是简单的序列号消息,每秒发送一次。
在这个玩具版本和我尝试编写的真实代码中,传输和接收过程都使用阻塞IO,因此似乎没有好的方法可以使用std::mutex
或其他同步机制。(在我的尝试中,一个进程会抓住互斥锁然后阻塞,这对此不起作用。)
构建和测试
为了构建和测试这个程序,我在64位Linux机器上使用gcc版本7.2.1和valgrind 3.13。构建:
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
为了测试,我使用以下命令运行服务器:
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
接着我在另一个窗口中使用telnet 127.0.0.1 5555
来创建与服务器的连接。 helgrind
正确指出的问题是,由于runTx
和runRx
都在异步访问同一流,所以存在数据竞争:
==16188== 在线程#1读取大小为1的0x1FFEFFF1CC时可能存在数据竞争
==16188== 持有的锁:无
...更多省略的行
concurrent.cpp
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]\n";
} else {
std::cout << command << '\n';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resetting\n";
}
}
}