检测空闲的 asyncio 事件循环

7

有没有一种编程模式能够让我检测到 asyncio 事件循环何时处于空闲状态?假设我的执行路径以某种复杂的方式分支,例如使用 asyncio.gather(),但我知道每个分支最终都会等待一些空闲的协程,例如套接字或子进程。假设我知道这些协程实际上从不暂停,因此事件循环将执行它可以执行的任何 Python 代码,但最终只会等待这些空闲的协程。是否有一种编程方法来检测这种状态并停止循环?


1
为什么需要停止循环?如果在套接字或子进程作业完成之前停止,它将无法将结果传播到等待它的协程。 - Mikhail Gerasimov
那些永不停歇的协程具有副作用,我希望它们被执行。但是一旦它们全部执行完毕,我希望应用程序停止。这是应用程序的一个特定用例场景,在该场景中,执行路径被简单地中断。 - jhrmnn
你能提供一些包含这些从未yield的协程的可重现代码吗?你想要检测它们吗? - Mikhail Gerasimov
1个回答

5
正如Philip Couling指出的那样,下面提供的解决方案无效。StackOverflow不允许删除已接受的答案,因此我在此添加此免责声明。
您所谓的“空闲”可能更准确地描述为“等待IO或超时”。 在编写正确的asyncio代码中,不需要检测循环处于该状态,因为它不应该问题-循环正在执行其工作,并且由像 asyncio.gather , asyncio.wait 和 loop.run_until_complete 这样的工具来确保它在适当的时间结束。 但是,事情并不总是完美的,如果您真的想这样做,这当然是可能的。
事件循环的每个步骤都会检查是否有准备好运行的任务。 如果有任何任务,则调用其步骤。 一旦没有更多的任务就绪,事件循环就会等待IO事件或最快超时,以先发生的为准。 值得注意的是,运行任务始终优先于等待IO。 因此,要检测没有任务就绪的情况,可以安排已知会立即触发的虚拟IO事件。
以下协程设置了这样的事件并等待其触发:
import socket, asyncio

async def detect_iowait():
    loop = asyncio.get_event_loop()
    rsock, wsock = socket.socketpair()
    wsock.close()
    await loop.sock_recv(rsock, 1)
    rsock.close()

它建立了一个套接字对,其中从一个套接字读取的数据将返回写入另一个套接字的数据。它立即关闭其中一个套接字,以便从另一个套接字读取将立即返回EOF,表示为空字节数组。等待从该套接字读取是基本上不会阻塞的 - 但是asyncio并不知道这一点,因此它将套接字放在IO等待列表中。如上所述,只要不存在可运行的任务,asyncio就会等待IO,detect_iowait将等待套接字的读取并退出。因此等待detect_iowait()本身可以检测IO等待。

使用detect_iowait()的测试代码可能像这样:

# stop loop.run_forever once iowait is detected
async def stop_on_iowait():
    await detect_iowait()
    print('iowait detected, stopping!')
    asyncio.get_event_loop().stop()

# a dummy calculation coroutine, emulating your execution path
async def calc(n):
    print('calc %d start' % n)
    async def noop():
        pass
    for i in range(n):
        await noop()
    print('calc %d end' % n)

# coroutine that waits on IO forever, also (ab)using a socket pair,
# this time creating a socket whose recv will never complete
async def io_forever():
    loop = asyncio.get_event_loop()
    sock, _ = socket.socketpair()
    sock.setblocking(False)
    await loop.sock_recv(sock, 1)

loop = asyncio.get_event_loop()
for t in calc(1000), calc(10000), calc(100000), io_forever():
    loop.create_task(t)
loop.create_task(stop_on_iowait())
loop.run_forever()

这在 Python 3.8 中至少不起作用。这里的示例代码实际上并没有测试事件循环是否处于空闲状态。原因是测试代码根本没有挂起。await noop() 不足以挂起任务。当我使用实际挂起的任务(Python 3.8)运行时,它不起作用。我在这里发布了一个后续问题,并提供了示例代码:https://dev59.com/vFMHtIcB2Jgan1zn7HRj - Philip Couling

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接