使用asyncio处理超时

5

声明:这是我第一次尝试使用 asyncio 模块。

我正在以下面的方式使用 asyncio.wait,尝试支持一个等待一组异步任务全部完成的超时特性。这是一个较大的库的一部分,所以我省略了一些不相关的代码。

注意,该库已经支持使用 ThreadPoolExecutors 和 ProcessPoolExecutors 提交任务并使用超时,因此我并不真正关心建议改用这些方法或者为什么我要使用 asyncio 的问题。现在来看代码...

import asyncio
from contextlib import suppress

... 

class AsyncIOSubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError

起初,我并没有担心超时取消挂起任务的问题,但后来在程序退出或循环关闭时,我收到了警告:“任务已被销毁,但仍处于挂起状态!”。经过一番研究,我找到了多种取消任务并等待其实际取消的方法: 选项1:
[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)

选项2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))

选项3:
# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task

选项 4:

使用类似于这个答案中的 while 循环。看起来我的其他选项更好,但为了完整性而包括。


选项 1 和 2 看起来都可以正常工作。其中任何一种可能都是“正确”的,但是随着 asyncio 在多年间的演变,网络上的例子和建议要么过时,要么有很大的差异。因此,我的问题是...

问题 1

选项 1 和 2 之间是否存在实际差异?我知道 run_until_complete 将运行直到 future 完成,因此,由于选项 1 按特定顺序循环,所以如果较早的任务需要更长时间才能完成,它可能会表现出不同的行为。我尝试查看 asyncio 的源代码,以了解 asyncio.wait 是不是在幕后以相同方式有效地处理其任务/ future,但这并不明显。

问题 2

我假设,如果其中一个任务处于长时间阻塞操作的中间,它可能无法立即取消?也许这取决于底层操作或正在使用的库是否会立即引发 CancelledError?也许对于为 asyncio 设计的库来说这应该永远不会发生?

由于我在试图实现一个超时功能,所以对此比较敏感。如果这些事情可能需要很长时间才能取消,我会考虑调用 cancel 而不等待它真正发生,或者设置一个非常短的超时时间来等待取消完成。

问题 3

是否有可能 loop.run_until_complete(或确切地说是对 async.wait 的调用)以除超时之外的其他原因返回 unfinished 中的值?如果是这样,我显然需要调整我的逻辑,但从文档来看,似乎不可能。

1个回答

4
有没有选项1和选项2之间的实际差别呢?
没有。选项2看起来更漂亮,可能会稍微更有效率,但它们的净效果是相同的。
我知道run_until_complete将一直运行,直到未来完成,因此,由于选项1正在按特定顺序循环,如果早期任务需要较长时间才能真正完成,它可能会有所不同。
乍一看似乎是这样,但实际上并非如此,因为loop.run_until_complete运行了提交到循环中的所有任务,而不仅仅是作为参数传递的任务。它只是在提供的可等待对象完成后停止 - 这就是“运行直到完成”的含义。在已经安排好的任务上调用run_until_complete的循环就像以下异步代码:
ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t

这句话的语义与以下线程代码等价:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()

换句话说,await trun_until_complete(t) 会阻塞直到 t 完成,但允许之前使用 asyncio.create_task() 调度的任务在此期间运行。因此,总运行时间将等于最长任务的运行时间,而不是它们的总和。例如,如果第一个任务花费了很长时间,所有其他任务将在此期间完成,它们的等待将不会休眠。
所有这些仅适用于等待先前已安排的任务。如果您尝试将其应用于协程,则无法正常工作:
# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
    await t

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()

对于asyncio的初学者来说,这经常是一个难点,他们编写等效于最后一个asyncio示例的代码,并期望它并行运行。

我尝试查看asyncio源代码,了解asyncio.wait是否只在其任务/未来下执行相同的操作,但这不是很明显。

asyncio.wait 只是一个方便的API,它执行两个操作:

  • 将输入参数转换为实现 Future 的内容。对于协程,这意味着它将它们提交到事件循环,就像使用 create_task 一样,这使它们可以独立运行。如果您一开始就给它任务,就像你做的那样,这一步将被跳过。
  • 使用 add_done_callback 在future完成时得到通知,此时它会恢复其调用者。

所以,是的,它执行相同的操作,但具有不同的实现,因为它支持更多功能。

我假设如果其中一个任务正在进行长时间的阻塞操作,它可能不会立即取消?

在asyncio中,不应该有“阻塞”操作,只有暂停操作,它们应该立即取消。唯一的例外是使用 run_in_executor 将阻塞代码附加到asyncio中,在这种情况下,底层操作根本不会取消,但asyncio协程将立即获得异常。

也许这取决于正在使用的底层操作或库是否立即引发CancelledError?

库不会“引发” CancelledError,它在暂停取消发生的位置接收它。对于库来说,取消的效果是 await ...打断它的等待并立即引发CancelledError。除非被捕获,否则该异常将通过函数和await调用传播至顶级协程,其引发CancelledError 标记整个任务已取消。良好行为的asyncio代码将仅如此,可能使用finally 释放它们持有的OS级资源。当捕获CancelledError 时,代码可以选择不重新引发它,在这种情况下,取消实际上被忽略。

loop.run_until_complete(或者确切地说,对async.wait的底层调用)是否可能以未完成的原因而返回值,而不是超时?

如果您使用的是return_when= asyncio.ALL_COMPLETE (默认值),那么不应该存在这种可能性。如果使用return_when= FIRST_COMPLETED ,则独立于超时而显然可能发生。


谢谢,非常有帮助。我有一个关于你第一个"Threading example"的快速问题/澄清...我猜你只是展示了一种类似但并不完全相同的使用线程的方式。我的假设是asyncio在后台不进行任何多线程操作,只是想确保我没有误解。 - totalhack
@totalhack 在所有方面都是正确的。Asyncio 是非常单线程的,其实现完全不同。正如您所猜测的那样,我旨在通过将等效概念与线程中的概念进行类比来解释这些概念。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接