声明:这是我第一次尝试使用 asyncio
模块。
我正在以下面的方式使用 asyncio.wait
,尝试支持一个等待一组异步任务全部完成的超时特性。这是一个较大的库的一部分,所以我省略了一些不相关的代码。
注意,该库已经支持使用 ThreadPoolExecutors 和 ProcessPoolExecutors 提交任务并使用超时,因此我并不真正关心建议改用这些方法或者为什么我要使用 asyncio
的问题。现在来看代码...
import asyncio
from contextlib import suppress
...
class AsyncIOSubmit(Node):
def get_results(self, futures, timeout=None):
loop = asyncio.get_event_loop()
finished, unfinished = loop.run_until_complete(
asyncio.wait(futures, timeout=timeout)
)
if timeout and unfinished:
# Code options in question would go here...see below.
raise asyncio.TimeoutError
起初,我并没有担心超时取消挂起任务的问题,但后来在程序退出或循环关闭时,我收到了警告:“任务已被销毁,但仍处于挂起状态!”。经过一番研究,我找到了多种取消任务并等待其实际取消的方法: 选项1:
[task.cancel() for task in unfinished]
for task in unfinished:
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
选项2:
[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))
选项3:
# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
await task
选项 4:
使用类似于这个答案中的 while 循环。看起来我的其他选项更好,但为了完整性而包括。
选项 1 和 2 看起来都可以正常工作。其中任何一种可能都是“正确”的,但是随着 asyncio
在多年间的演变,网络上的例子和建议要么过时,要么有很大的差异。因此,我的问题是...
问题 1
选项 1 和 2 之间是否存在实际差异?我知道 run_until_complete
将运行直到 future 完成,因此,由于选项 1 按特定顺序循环,所以如果较早的任务需要更长时间才能完成,它可能会表现出不同的行为。我尝试查看 asyncio 的源代码,以了解 asyncio.wait
是不是在幕后以相同方式有效地处理其任务/ future,但这并不明显。
问题 2
我假设,如果其中一个任务处于长时间阻塞操作的中间,它可能无法立即取消?也许这取决于底层操作或正在使用的库是否会立即引发 CancelledError?也许对于为 asyncio 设计的库来说这应该永远不会发生?
由于我在试图实现一个超时功能,所以对此比较敏感。如果这些事情可能需要很长时间才能取消,我会考虑调用 cancel
而不等待它真正发生,或者设置一个非常短的超时时间来等待取消完成。
问题 3
是否有可能 loop.run_until_complete
(或确切地说是对 async.wait
的调用)以除超时之外的其他原因返回 unfinished
中的值?如果是这样,我显然需要调整我的逻辑,但从文档来看,似乎不可能。