为什么 asyncio subprocess 在没有设置事件循环时表现不同?

3
我有一段简单的异步代码,它会生成 sleep 3 并等待它完成:
from asyncio import SelectorEventLoop, create_subprocess_exec, \
    wait_for, get_event_loop, set_event_loop


def run_timeout(loop, awaitable, timeout):
    timed_awaitable = wait_for(awaitable, timeout=timeout, loop=loop)
    return loop.run_until_complete(timed_awaitable)

async def foo(loop):
    process = await create_subprocess_exec('sleep', '3', loop=loop)
    await process.wait()
    print(process.returncode)

注意它需要一个自定义的循环。如果我使用以下内容运行它:
loop = get_event_loop()
run_timeout(loop, foo(loop), 5)
loop.close()

它按预期工作(在3秒钟的sleep 3成功完成并打印0)。但是,如果我使用自己的事件循环运行它:
loop = SelectorEventLoop()
run_timeout(loop, foo(loop), 5)
loop.close()

我遇到了一个TimeoutError错误(来自于run_timeout中的wait_for)。
Traceback (most recent call last):
  File "test.py", line 15, in <module>
    _run_async(loop, foo(loop), 5)
  File "test.py", line 7, in _run_async
    return loop.run_until_complete(timed_coroutine)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 396, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

我能让自定义事件循环工作的唯一方法是在创建自己的SelectorEventLoop之后调用set_event_loop()
loop = SelectorEventLoop()
set_event_loop(loop)
run_timeout(loop, foo(loop), 3)
loop.close()

这里怎么回事?我是否误解了文档?必须将所有事件循环(您使用的)都设置为默认值吗?如果是这样,那么允许传递自定义循环给许多异步方法(例如 create_subprocess_exec 和 wait_for)似乎毫无意义,因为您可以传递的唯一值是 get_event_loop(),它就是默认值。

这似乎是asyncio.subprocess中的一个错误。在Python 3.6下运行代码会报告a different exception - user4815162342
有趣!这个异常有意义吗?还是3.6引发的错误也是一个bug?在运行run_timeout()之前执行set_event_loop()是否可以抑制3.6的错误? - Bailey Parker
我怀疑这两个异常都是同一个错误的不同表现。(3.6版本的代码更加谨慎,因此它立即注意到有些地方不对劲。) - user4815162342
但是还有一些不对的地方:为什么你的自定义循环代码在超时时指定了3?难道不应该期望等待3秒钟让sleep 3完成(因为子进程需要一些时间来设置)而出现超时吗? - user4815162342
那很有道理。我会提交一个 bug :) 感谢您的见解。我还没有太多地尝试过 asyncio。另外,那只是一个打字错误。对于自定义循环代码,超时时间也是 5。 - Bailey Parker
显示剩余2条评论
1个回答

1
很奇怪,我调试了程序,发现很难说这是一个bug。
简而言之,在执行create_subprocess_exec时,你不仅需要一个事件循环,还需要一个子进程监视器(用于监视子进程)。但是create_subprocess_exec没有提供一种方法让你设置自定义的子进程监视器,它只使用默认监视器,该监视器附加到默认事件循环而不是当前正在运行的事件循环。
如果您使用以下代码,它将起作用:
from asyncio import SelectorEventLoop, create_subprocess_exec, \
    wait_for, get_event_loop, set_event_loop, get_child_watcher


def run_timeout(loop, awaitable, timeout):
    timed_awaitable = wait_for(awaitable, timeout=timeout)
    return loop.run_until_complete(timed_awaitable)

async def foo():
    process = await create_subprocess_exec('sleep', '3')
    await process.wait()
    print(process.returncode)

loop = SelectorEventLoop()
# core line, get default child watcher and attach it to your custom loop.
get_child_watcher().attach_loop(loop)
run_timeout(loop, foo(), 5)
loop.close()

如果您使用set_event_loop来设置默认循环,它还将重新连接默认子监视器到新的默认循环。这就是为什么它能够工作的原因。

很难确定这是一个错误还是API设计问题。 create_subprocess_exec 是否允许您传递自定义监视器?如果允许,它会导致混淆,因为只有在使用子进程时才会涉及到子监视器。


很难说这是一个 bug 还是 API 设计问题。因为 create_subprocess_exec 允许传递显式的 loop,所以它无条件地使用默认循环中的默认 watcher 显然听起来像是一个 bug。 - user4815162342
我并不完全认同watcherloop的关系就像是flask-extensionflask一样。实际上,对于loop本身来说,它运行得非常好。OP甚至可以只使用loop.run_until_complete(timed_awaitable)而不传递任何冗余的loop,例如wait_for(awaitable, timeout=timeout, loop=loop)create_subprocess_exec('sleep', '3', loop=loop)。但如果API设计原则是隐藏很少使用的watcher,那么create_subprocess_exec应该自动检测watcher是否连接到正在运行的loop - Sraw
我不确定我完全理解Flask类比的意思,因为我没有使用过Flask,但是3.6引发的异常显示出代码并没有明确提到子进程监视器。这不是一个错误吗?我理解使用默认循环时一切都会正常工作,但是我的理解是,使用非默认循环得到asyncio支持,并且不需要了解更高级的概念,例如子进程监视器,应该可以正常工作。 - user4815162342
如果我的理解是正确的,我认为我同意@user4815162342的观点。我正在使用3.5.2,所以这不适用,但他们在问题上的评论指出,在async上下文中调用get_event_loop()将返回正在运行async的循环。因此,该循环应该设置此子监视器。由于create_subprocess_exec在此上下文中运行,因此监视器是否可用? - Bailey Parker
@BaileyParker 我已在3.6.4上测试了这个片段,它可以正常工作。也许在3.5.2上是个bug?wait_for只是创建了一个可等待实例,loop.run_until_complete运行此实例,因此它将附加到此loop - Sraw
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接