我要做什么
我正在尝试模拟以下简单的socat(1)
命令的行为:
socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'
上述命令创建了一个分叉TCP服务器,为每个连接派生并执行
SOME_PROGRAM
,将该命令的stdin
和stdout
重定向到TCP套接字中。我想实现以下内容:
1.使用
asyncio
创建简单的TCP服务器以处理多个并发连接。
2.收到连接时,启动SOME_PROGRAM
作为子进程。
3.将从套接字接收到的任何数据传递给SOME_PROGRAM
的标准输入。
4.将从SOME_PROGRAM
的标准输出接收到的任何数据传递给套接字。
5.SOME_PROGRAM
退出时,向套接字写入一条告别消息和退出代码,并关闭连接。我想在纯Python中完成这个任务,不使用外部库,使用
asyncio
模块。我目前所拥有的是:
这是我目前编写的代码:
import asyncio
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.client_addr = transport.get_extra_info('peername')
self.transport = transport
self.child_process = None
print('Connection with {} enstablished'.format(self.client_addr))
asyncio.ensure_future(self._create_subprocess())
def connection_lost(self, exception):
print('Connection with {} closed.'.format(self.client_addr))
if self.child_process.returncode is not None:
self.child_process.terminate()
def data_received(self, data):
print('Data received: {!r}'.format(data))
# Make sure the process has been spawned
# Does this even make sense? Looks so awkward to me...
while self.child_process is None:
continue
# Write any received data to child_process' stdin
self.child_process.stdin.write(data)
async def _create_subprocess(self):
self.child_process = await asyncio.create_subprocess_exec(
*TARGET_PROGRAM,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
# Start reading child stdout
asyncio.ensure_future(self._pipe_child_stdout())
# Ideally I would register some callback here so that when
# child_process exits I can write to the socket a goodbye
# message and close the connection, but I don't know how
# I could do that...
async def _pipe_child_stdout(self):
# This does not seem to work, this function returns b'', that is an
# empty buffer, AFTER the process exits...
data = await self.child_process.stdout.read(100) # Arbitrary buffer size
print('Child process data: {!r}'.format(data))
if data:
# Send to socket
self.transport.write(data)
# Reschedule to read more data
asyncio.ensure_future(self._pipe_child_stdout())
SERVER_PORT = 6666
TARGET_PROGRAM = ['./test']
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
server = loop.run_until_complete(coro)
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
我还要尝试作为子进程运行的 ./test
程序:
#!/usr/bin/env python3
import sys
if sys.stdin.read(2) == 'a\n':
sys.stdout.write('Good!\n')
else:
sys.exit(1)
if sys.stdin.read(2) == 'b\n':
sys.stdout.write('Wonderful!\n')
else:
sys.exit(1)
sys.exit(0)
很不幸,以上代码并没有起到作用,我有些迷茫,不知道接下来该尝试什么。
预期工作正常的情况:
- 子进程正确启动,并似乎正确地从套接字接收输入,因为我可以从
htop
中看到它,而且只要我发送b\n
, 它就会终止。
预期工作不正常的情况:
基本上任何其他的情况...
- 子进程的输出从未发送到套接字,实际上根本没有被读取。调用
await self.child_process.stdout.read(100)
似乎永远不会终止:相反,它只在子进程死亡后终止,结果只是一个空的bytes
对象b''
。 - 我无法理解子进程何时终止:如上所述,当发生这种情况时,我想向套接字发送“Goodbye”消息,以及
self.child_process.returncode
,但我不知道如何以有意义的方式实现这一点。
我尝试过的::
- 我尝试使用
asyncio.loop.subprocess_exec()
创建子进程,而不是使用asyncio.create_subprocess_exec()
。这解决了知道进程何时终止的问题,因为我可以实例化一个子类asyncio.SubprocessProtocol
并使用其process_exited()
方法,但是,这并没有真正帮助我,因为如果我用这种方式做,我就无法再与进程的stdin
或stdout
交互了!也就是说,我没有与进程交互的Process
对象了... - 我尝试使用
asyncio.loop.connect_write_pipe()
和loop.connect_read_pipe()
,但没有成功。
问题
所以,有人能帮我弄清楚我做错了什么吗?肯定有一种方法可以使这个工作顺利进行。当我最开始时,我正在寻找一种轻松使用一些管道重定向的方法,但是我不知道现在是否可能。是吗?看起来应该可以。
asyncio
编写TCP服务器。我不知道的是如何让客户端与子进程通信。 - Marco Bonelliasyncio.create_subprocess_exec
替换客户端处理程序中的asyncio.open_connection
。 - Vincent