异步IO RuntimeError: 事件循环已关闭

22

我正在尝试使用Asyncio和aiohttp库进行大量请求(约1000个),但我遇到了一个问题,找不到太多关于它的资料。

当我使用10个网址运行此代码时,它可以正常运行。但是当我使用100个以上的网址运行它时,它会出现错误并显示 RuntimeError: Event loop is closed

import asyncio
import aiohttp


@asyncio.coroutine
def get_status(url):
    code = '000'
    try:
        res = yield from asyncio.wait_for(aiohttp.request('GET', url), 4)
        code = res.status
        res.close()
    except Exception as e:
        print(e)
    print(code)


if __name__ == "__main__":
    urls = ['https://google.com/'] * 100
    coros = [asyncio.Task(get_status(url)) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(coros))
    loop.close()

堆栈跟踪可以在这里找到。

非常感谢任何帮助或见解,因为我已经为此烦恼了几个小时。显然,这表明应该仍然开启的事件循环已关闭,但我不知道这是如何可能的。


不是 Asyncio 错误。Python 递归错误,已达到限制。需要为所有非类函数使用线程... - dsgdfg
首先,请确保您正在使用最新的aiohttp版本。 我假设您已经这样做了。 从技术上讲,aiohttp在完成请求后需要进行一次循环迭代以关闭底层套接字。 因此,在调用loop.close()之前插入loop.run_until_complete(asyncio.sleep(0)) - Andrew Svetlov
您的回溯信息表明,通过run_in_executor提交给Executor的作业在循环关闭后返回。奇怪的是,aiohttpasyncio都没有使用run_in_executor... - Vincent
@AndrewSvetlov,感谢您的回复-我尝试在关闭之前睡觉,但仍然没有成功...还有其他想法吗? - Patrick Allen
@Vincent 技术上来说,它们确实可以,DNS解析是由run_in_executor执行的--但应在完成get_status任务之前完成。 - Andrew Svetlov
对于使用Python的异步Socket.IO的任何人,请确保在您的主函数中运行await sio.wait() - LeoDog896
3个回答

18

这个漏洞已经被记录在https://github.com/python/asyncio/issues/258 请继续关注。

作为一个快速的解决方法,我建议使用自定义执行器,例如:

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(5)
loop.set_default_executor(executor)

在完成您的程序之前,请执行以下操作

executor.shutdown(wait=True)
loop.close()

1
太棒了,安德鲁,感谢你的帮助。我没意识到我在和团队中的一部分交流 : )。在 GitHub 上继续关注这个问题。 - Patrick Allen
Changed in version 3.5.3: BaseEventLoop.run_in_executor() no longer configures the max_workers of the thread pool executor it creates - RomanPerekhrest
安德鲁,你能否提供一些稳健的解决方案而不是“快速解决方案”来解决Python 3.5的问题? - RomanPerekhrest

7

你是对的,loop.getaddrinfo 使用 ThreadPoolExecutor 在线程中运行 socket.getaddrinfo

你正在使用带有超时的 asyncio.wait_for,这意味着 res = yield from asyncio.wait_for... 将在4秒后引发一个 asyncio.TimeoutError。然后,get_status 协程将返回 None 并停止循环。如果一个任务在此之后完成,它将尝试在事件循环中安排回调,并因为已经关闭而引发异常。


啊,这有道理,但这是我找到的唯一实现请求超时的方法。你知道有没有一种方法可以在不关闭循环的情况下超时? - Patrick Allen
@PatrickAllen 你可能想要增加默认为5的工作线程数 - Vincent
2
@PatrickAllen 或者在关闭循环之前使用 loop._default_executor.shutdown(wait=True) - Vincent
我会将这个标记为已回答,因为这似乎已经解决了原始问题。我应该限制最大连接数吗?似乎请求无缘无故超时了。也许我请求得太快了? - Patrick Allen
@PatrickAllen 嗯,有5个工作线程和一千个请求意味着你正在尝试在4秒内运行200个 socket.getaddrinfo,这对我来说似乎是合理的,尽管工作线程数可以增加。您还可以为 request 提供自定义的 TcpConnector 以指定连接超时时间:connector=aiohttp.TCPConnector(loop=loop, force_close=True, conn_timeout=1) - Vincent

0

这是解释器中的一个错误。幸运的是,它在3.10.6中最终得到了修复,所以您只需要更新已安装的Python即可。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接