如何使用asyncio优雅地设置超时时间

18

因此,在添加try/catch块之前,当进程运行时间少于5分钟时,我的事件循环会正常关闭,但是在添加try/catch块后,当进程超过5分钟时,我开始收到这个错误

async def run_check(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
                    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    fut = p.communicate()
    try:
        pcap_run = await asyncio.wait_for(fut, timeout=5)
    except asyncio.TimeoutError:
        pass

def get_coros():
    for pcap_loc in print_dir_cointent():
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main(self):
    p_coros = get_coros()
    for f in asyncio.as_completed(p_coros):
        res = await f




loop = asyncio.get_event_loop()
loop.run_until_complete(get_coros())
loop.close()

Traceback:

追踪信息:
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of 
    <_UnixSubprocessTransport closed pid=171106 running stdin=
    <_UnixWritePipeTransport closing fd=8 open> stdout=<_UnixReadPipeTransport fd=9 open>>>
    Traceback (most recent call last):
      File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
      File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
      File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
      File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
      File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
      File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
      File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
    RuntimeError: Event loop is closed

当我的代码执行到最后一行时,出现了回溯错误。

调试日志:

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:run shell command '/local/p_check w_1.pcap --json' stdin=<pipe> stdout=stderr=<pipe>
DEBUG:asyncio:process '/local/p_check w_1.pcap --json' created: pid 171289DEBUG:asyncio:Write pipe 8 connected: (<_UnixWritePipeTransport fd=8 idle bufsize=0>, <WriteSubprocessPipeProto fd=0 pipe=<_UnixWritePipeTransport fd=8 idle bufsize=0>>)
DEBUG:asyncio:Read pipe 9 connected: (<_UnixReadPipeTransport fd=9 polling>, <ReadSubprocessPipeProto fd=1 pipe=<_UnixReadPipeTransport fd=9 polling>>) INFO:asyncio:run shell command '/local/p_check w_1.pcap --json': <_UnixSubprocessTransport pid=171289 running stdin=<_UnixWritePipeTransport fd=8 idle bufsize=0> stdout=<_UnixReadPipeTransport fd=9 polling>>
DEBUG:asyncio:<Process 171289> communicate: read stdout
INFO:asyncio:poll 4997.268 ms took 5003.093 ms: timeout
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
1个回答

12

loop.run_until_complete 接受等待对象:协程或者 future。您需要传递一个返回空值的函数的结果。

您应该修改 get_coros() 函数,它应该返回协程列表:

def get_coros():
    ...
    return coros

将该列表转换为可等待对象,以便逐个执行作业(或并行执行,如果需要)。例如:

async def main():
    for coro in get_coros():
        await coro

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

更新:

我现在无法测试我的猜测,但是我的猜测是:虽然asyncio.wait_for(fut, timeout=5)会在5秒后取消任务,但它不会终止进程。你可以手动完成这个操作:

try:
    await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
    p.kill()
    await p.communicate()

1
更新了上面的代码,我得到了相同的错误,只是当我在main()中循环而没有使用asyncio.as_completed时,我会得到这个_[ERROR] Task was destroyed but it is pending!_ - Nabz C
1
完美的。谢谢。 - Nabz C
1
在我的代码中,我使用asyncio.wait中的超时。它会导致wait函数在超时后返回,而不是抛出超时错误。待处理的作业(由asyncio.wait返回)可以重新添加并继续正常运行。"取消"任务是完全不同的事情。 - Ytsen de Boer
2
@YtsendeBoer 文档 明确指出:“如果超时发生,则取消任务并引发 asyncio.TimeoutError”。你可能将 asyncio.wait_forasyncio.wait 混淆了。后者行为如你所说,但与此答案无关。 - Mikhail Gerasimov
1
嗯,我现在正在查看我的代码,就像我告诉你的那样 :) 运行得非常好。但是你是对的,这已经离题了,抱歉。 - Ytsen de Boer
嘿 @NabzC,你能解决如何逃避[ERROR]任务已被销毁但仍待处理的问题吗? - Arnav

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接