使用asyncio实现的简单Python TCP fork服务器

7

我要做什么

我正在尝试模拟以下简单的socat(1)命令的行为:

socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'

上述命令创建了一个分叉TCP服务器,为每个连接派生并执行SOME_PROGRAM,将该命令的stdinstdout重定向到TCP套接字中。
我想实现以下内容:
1.使用asyncio创建简单的TCP服务器以处理多个并发连接。 2.收到连接时,启动SOME_PROGRAM作为子进程。 3.将从套接字接收到的任何数据传递给SOME_PROGRAM的标准输入。 4.将从SOME_PROGRAM的标准输出接收到的任何数据传递给套接字。 5.SOME_PROGRAM退出时,向套接字写入一条告别消息和退出代码,并关闭连接。
我想在纯Python中完成这个任务,不使用外部库,使用asyncio模块。
我目前所拥有的是:
这是我目前编写的代码:
import asyncio

class ServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.client_addr   = transport.get_extra_info('peername')
        self.transport     = transport
        self.child_process = None

        print('Connection with {} enstablished'.format(self.client_addr))

        asyncio.ensure_future(self._create_subprocess())

    def connection_lost(self, exception):
        print('Connection with {} closed.'.format(self.client_addr))

        if self.child_process.returncode is not None:
            self.child_process.terminate()

    def data_received(self, data):
        print('Data received: {!r}'.format(data))

        # Make sure the process has been spawned
        # Does this even make sense? Looks so awkward to me...
        while self.child_process is None:
            continue

        # Write any received data to child_process' stdin
        self.child_process.stdin.write(data)

    async def _create_subprocess(self):
        self.child_process = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE
        )

        # Start reading child stdout
        asyncio.ensure_future(self._pipe_child_stdout())

        # Ideally I would register some callback here so that when
        # child_process exits I can write to the socket a goodbye
        # message and close the connection, but I don't know how
        # I could do that...

    async def _pipe_child_stdout(self):
        # This does not seem to work, this function returns b'', that is an
        # empty buffer, AFTER the process exits...
        data = await self.child_process.stdout.read(100) # Arbitrary buffer size

        print('Child process data: {!r}'.format(data))

        if data:
            # Send to socket
            self.transport.write(data)
            # Reschedule to read more data
            asyncio.ensure_future(self._pipe_child_stdout())


SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
    server = loop.run_until_complete(coro)

    print('Serving on {}'.format(server.sockets[0].getsockname()))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

我还要尝试作为子进程运行的 ./test 程序:

#!/usr/bin/env python3

import sys

if sys.stdin.read(2) == 'a\n':
    sys.stdout.write('Good!\n')
else:
    sys.exit(1)

if sys.stdin.read(2) == 'b\n':
    sys.stdout.write('Wonderful!\n')
else:
    sys.exit(1)

sys.exit(0)

很不幸,以上代码并没有起到作用,我有些迷茫,不知道接下来该尝试什么。

预期工作正常的情况:

  • 子进程正确启动,并似乎正确地从套接字接收输入,因为我可以从htop中看到它,而且只要我发送b\n, 它就会终止。

预期工作不正常的情况:

基本上任何其他的情况...

  • 子进程的输出从未发送到套接字,实际上根本没有被读取。调用await self.child_process.stdout.read(100)似乎永远不会终止:相反,它只在子进程死亡后终止,结果只是一个空的bytes对象b''
  • 我无法理解子进程何时终止:如上所述,当发生这种情况时,我想向套接字发送“Goodbye”消息,以及self.child_process.returncode,但我不知道如何以有意义的方式实现这一点。

我尝试过的::

问题

所以,有人能帮我弄清楚我做错了什么吗?肯定有一种方法可以使这个工作顺利进行。当我最开始时,我正在寻找一种轻松使用一些管道重定向的方法,但是我不知道现在是否可能。是吗?看起来应该可以。


你看过这个问题吗:https://dev59.com/zlYM5IYBdhLWcg3wrRqC? - sanyassh
尝试在调试模式下使用asyncio运行,可能会有用。 - xrisk
@Sanyash 我现在有了,但我真的不知道那怎么能帮助。我知道如何使用asyncio编写TCP服务器。我不知道的是如何让客户端与子进程通信。 - Marco Bonelli
你可能想尝试一种更高级的方法,类似于这个简单的tcp代理,用asyncio.create_subprocess_exec替换客户端处理程序中的asyncio.open_connection - Vincent
1个回答

8

您的代码有两个即时实施问题:

  • 服务器在将接收到的数据传输到子进程之前会剥离其中的空格。这将删除尾随的换行符,因此如果TCP客户端发送"a\n",则子进程将只收到"a"。这样,子进程永远不会遇到预期的"a\n"字符串,并且它总是在读取两个字节后终止。这解释了来自子进程的空字符串(EOF)。(已在后续编辑中删除剥离操作。)
  • 子进程不刷新其输出,因此服务器不会接收到任何写入。仅在子进程退出或填满其输出缓冲区时才会看到这些写入,当显示短调试消息时,输出缓冲区的大小以千字节为单位,需要一段时间才能填满。
另一个问题在于设计层面。如评论所述,除非您明确意图实现新的asyncio协议,否则建议使用更高级别的基于流的API,例如“stream-based API”,在这种情况下是推荐使用{{link3:start_server}}函数。像SubprocessProtocolconnect_write_pipeconnect_read_pipe这样的更低级别功能也不是应用程序代码中想要使用的内容。本答案的其余部分假定采用基于流的实现。 start_server接受一个协程,每当客户端连接时就会生成一个新任务。它使用两个asyncio流参数调用,一个用于读取,一个用于写入。该协程包含与客户端通信的逻辑;在您的情况下,它将生成子进程并在其与客户端之间传输数据。
请注意,套接字和子进程之间的双向数据传输不能通过简单的读取后写入的循环实现。例如,请考虑以下循环:
# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
    proc_data = await child.stdout.read(1024)  # (1)
    sock_writer.write(proc_data)
    sock_data = await sock_reader.read(1024)
    child.stdin.write(sock_data)               # (2)

这种循环容易出现死锁。如果子进程正在响应来自TCP客户端接收到的数据,它有时只会在接收到一些输入后提供输出。这将无限期地阻塞循环中的(1),因为它只能在稍后发送sock_data给子进程后从子进程的stdout获得数据,这发生在(2)。事实上,(1)等待(2),反之亦然,构成了死锁。请注意,反转传输顺序也无法解决问题,因为如果TCP客户端正在处理服务器子进程的输出,则循环将死锁。

使用asyncio,很容易避免这种死锁:只需同时生成两个协程,一个从套接字传输数据到子进程的stdin,另一个从子进程的stdout传输数据到套接字。例如:

# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
    while True:
        data = await src.read(1024)
        if data == b'':
            break
        dest.write(data)

child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()

这种设置与第一个while循环的区别在于两个传输是独立的。死锁不会发生,因为从套接字读取的操作不会等待子进程读取,反之亦然。
应用到问题中,整个服务器将如下所示:
import asyncio

class ProcServer:
    async def _transfer(self, src, dest):
        while True:
            data = await src.read(1024)
            if data == b'':
                break
            dest.write(data)

    async def _handle_client(self, r, w):
        loop = asyncio.get_event_loop()
        print(f'Connection from {w.get_extra_info("peername")}')
        child = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)
        sock_to_child = loop.create_task(self._transfer(r, child.stdin))
        child_to_sock = loop.create_task(self._transfer(child.stdout, w))
        await child.wait()
        sock_to_child.cancel()
        child_to_sock.cancel()
        w.write(b'Process exited with status %d\n' % child.returncode)
        w.close()

    async def start_serving(self):
        await asyncio.start_server(self._handle_client,
                                   '0.0.0.0', SERVER_PORT)

SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    server = ProcServer()
    loop.run_until_complete(server.start_serving())
    loop.run_forever()

附带的test程序也必须进行修改,每次调用sys.stdout.write()后必须调用sys.stdout.flush(),否则消息会滞留在其stdio缓冲区中而不是被发送到父进程。

当我刚开始时,我正在寻找一种轻松使用一些管道重定向的方法,但我不知道现在是否可能。这样做可以吗?看起来应该可以。

在类Unix系统上,肯定可以将套接字重定向到生成的子进程,以便子进程直接与客户端通信。(旧版的inetd Unix服务器就是这样工作的。) 但是,出于两个原因,asyncio不支持这种操作方式:
  • 它不适用于Python和asyncio支持的所有系统,特别是Windows系统。
  • 它与核心asyncio功能不兼容,例如传输/协议和流,这些功能假定对底层套接字具有所有权和独占访问权限。
即使您不关心可移植性,也要考虑第二点:您可能需要处理或记录TCP客户端和子进程之间交换的数据,如果它们在内核级别上被焊接在一起,那么您将无法做到这一点。此外,在使用asyncio协程处理时,超时和取消操作要比仅处理不透明子进程时更容易实现。
如果您的用例不需要可移植性和无法控制通信,则可能根本不需要使用asyncio-没有什么可以阻止您生成运行经典阻塞服务器的线程,该服务器使用与在C中编写的os.fork, os.dup2, and os.execlp相同的序列来处理每个客户端。 编辑 正如OP在评论中指出的那样,原始代码通过杀死子进程来处理TCP客户端断开连接的情况。在流层面上,连接丢失会通过流信号文件结束或引发异常来反映。在上述代码中,可以通过用更具体的协程替换通用的self._transfer()来轻松地对连接丢失做出反应。例如,可以使用以下代码代替:
sock_to_child = loop.create_task(self._transfer(r, child.stdin))

...可以写成:

sock_to_child = loop.create_task(self._sock_to_child(r, child))

并像这样定义_sock_to_child(未经测试):

async def _sock_to_child(self, reader, child):
    try:
        await self._transfer(reader, child.stdin)
    except IOError as e:
        # IO errors are an expected part of the workflow,
        # we don't want to propagate them
        print('exception:', e)
    child.kill()

如果子进程比TCP客户端活得更久,那么child.kill()这一行很可能永远不会执行,因为在_transfer()中的src.read()被挂起时,协程将被_handle_client取消。请注意,HTML标签已保留。

首先,非常感谢您花时间回答我的问题!对于数据上的.strip()是一个编辑错误,我很抱歉,但是感谢您指出。您也是正确的,我应该在写入后flush() stdout。不过,您的代码有一个问题:与我的代码不同,如果客户端关闭连接,则子进程永远不会终止。这至关重要,而我正是为此原因而子类化asyncio.Protocol(即connection_lost回调)。即使使用您更简单的方法,是否可以解决此问题? - Marco Bonelli
好的,我明白了,老实说看起来有点笨拙,但它能完成工作。不过要注意一点:try-except块是没有必要的,因为当到达EOF(即客户端断开连接)时,src.read()返回b'',即使我们在客户端断开连接后尝试从套接字中read() 也是如此。另外,你是正确的,整个asyncio模块和逻辑在我开始尝试使用它时确实很令人困惑。再次感谢 :) - Marco Bonelli
@MarcoBonelli 如果客户端干净地断开连接,那么这是正确的,但是在网络故障和超时的情况下,您很容易遇到IOError。如果您使用过BSD套接字进行编程,您会看到“对等方重置连接”和类似的错误 - 这就是StreamReader.read将引发的异常类型。 - user4815162342
@MarcoBonelli 关于笨拙的部分,你是指整个流概念还是编辑部分不明显?一旦你深入了解应用程序的具体细节,我认为代码就不能比那更优雅了... - user4815162342
我在谈论编辑。对我来说,没有连接丢失或进程被杀死的回调有点让人烦恼,但我想这只是因为我更习惯于使用回调而不是没有回调。 - Marco Bonelli
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接