Python asyncio 强制超时

11
使用asyncio,协程可以执行超时操作,因此在超时后它会被取消:

使用asyncio,协程可以在超时后被取消:

@asyncio.coroutine
def coro():
    yield from asyncio.sleep(10)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 5))

上面的例子按预期运行(5秒后超时)。

然而,当协程没有使用asyncio.sleep()(或其他asyncio协程)时,它似乎不会超时。例如:

@asyncio.coroutine
def coro():
    import time
    time.sleep(10)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 1))

这段代码运行时间超过10秒,因为time.sleep(10)没有被取消。在这种情况下,有可能强制取消协程吗?

如果要使用asyncio解决这个问题,我该怎么做?

3个回答

23

不,除非协程将控制权交还给事件循环,否则你不能中断协程,这意味着它需要在yield from调用内部。 asyncio是单线程的,因此当您在第二个示例中阻塞time.sleep(10)调用时,事件循环无法运行。这意味着当您使用wait_for设置的超时到期时,事件循环将无法对其采取行动。事件循环直到coro退出才有机会再次运行,此时为时已晚。

这就是为什么通常情况下,您应该始终避免任何没有异步操作的阻塞调用;每当调用在不向事件循环产生输出的情况下阻塞时,您的程序中的其他所有内容都无法执行,这可能不是您想要的。如果您确实需要进行长时间的阻塞操作,则应尝试使用BaseEventLoop.run_in_executor 将其在线程或进程池中运行,以避免阻塞事件循环:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def coro(loop):
    ex = ProcessPoolExecutor(2)
    yield from loop.run_in_executor(ex, time.sleep, 10)  # This can be interrupted.

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(loop), 1))

这里还有一个有用的例子:https://github.com/calebmadrigal/asyncio-examples/blob/master/run_in_executor.py - shrx

3

感谢@dano的回答。如果运行协程不是硬性要求,这里提供一种重制、更精简的版本。

import asyncio, time

timeout = 0.5
loop = asyncio.get_event_loop()
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout)
try:
    loop.run_until_complete(future)
    print('Thx for letting me sleep')
except asyncio.exceptions.TimeoutError:
    print('I need more sleep !')

对于好奇的人,我的Python 3.8.2中进行一些调试显示,将None作为执行者传递会创建一个_default_executor,如下所示:

self._default_executor = concurrent.futures.ThreadPoolExecutor()

Python进程在TimeoutError之后继续运行。是否有办法使Python程序在except块被触发时退出? - Justin
1
@Justin 谢谢你的评论,让我更新了我的 Python 3.8 的答案(在 except 中捕获不同的类)。至于你的问题,让错误自由地传递确实会让解释器停止运行(要么完全删除 except,要么在其末尾使用 raise)。 - Arnaud P

0
我看到的超时处理示例都非常简单。实际情况下,我的应用程序要复杂一些。流程如下:
  1. 当客户端连接到服务器时,服务器会创建另一个连接到内部服务器
  2. 当内部服务器连接正常时,等待客户端发送数据。根据这些数据,我们可能会向内部服务器发出查询。
  3. 当有数据要发送到内部服务器时,请发送它。由于内部服务器有时无法快速响应,因此将此请求包装成超时。
  4. 如果操作超时,则折叠所有连接以向客户端发出错误信号
为了实现上述所有内容,并保持事件循环运行,结果代码包含以下代码:
def connection_made(self, transport):
    self.client_lock_coro = self.client_lock.acquire()
    asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock)

def _got_client_lock(self, task):
    task.result() # True at this point, but call there will trigger any exceptions
    coro = self.loop.create_connection(lambda: ClientProtocol(self),
                                           self.connect_info[0], self.connect_info[1])
    asyncio.ensure_future(asyncio.wait_for(coro,
                                           self.client_connect_timeout
                                           )).add_done_callback(self.connected_server)

def connected_server(self, task):
    transport, client_object = task.result()
    self.client_transport = transport
    self.client_lock.release()

def data_received(self, data_in):
    asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout))

def send_to_real_server(self, message, timeout=5.0):
    yield from self.client_lock.acquire()
    asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message),
                                                   timeout, loop=self.loop)
                                  ).add_done_callback(self.sent_to_real_server)

@asyncio.coroutine
def _send_to_real_server(self, message):
    self.client_transport.write(message)

def sent_to_real_server(self, task):
    task.result()
    self.client_lock.release()

1
这个回答似乎并没有回答实际问题,我也认为这并没有什么帮助。(因此我给它点了踩。)在我看来,代码中做了太多无关的事情,而实际的超时处理并没有清晰地展示出来。希望这个反馈能有所帮助。 - siebz0r
1
感谢您的反馈。实际问题是关于协程能否在超时后执行,而我的代码确实可以做到这一点。正如我在答案中所述,在整个互联网上找不到使用loop.run_until_complete()之外的超时执行协程的代码,这就是为什么我发布了这篇文章的原因。另外,鉴于限制条件,方法/函数的数量似乎是必须的。请随意提供更优化的代码。 - Jari Turkia

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接