Python中的异步异常处理

80

我有下面这段代码,使用asyncioaiohttp来进行异步的HTTP请求。

import sys
import asyncio
import aiohttp

@asyncio.coroutine
def get(url):
    try:
        print('GET %s' % url)
        resp = yield from aiohttp.request('GET', url)
    except Exception as e:
        raise Exception("%s has error '%s'" % (url, e))
    else:
        if resp.status >= 400:
            raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))

    return (yield from resp.text())

@asyncio.coroutine
def fill_data(run):
    url = 'http://www.google.com/%s' % run['name']
    run['data'] = yield from get(url)

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    task = asyncio.wait([fill_data(r) for r in runs])
    loop.run_until_complete(task)   
    return runs

try:
    get_runs()
except Exception as e:
    print(repr(e))
    sys.exit(1)

由于某些原因,在get函数内引发的异常不会被捕获:

Future/Task exception was never retrieved
Traceback (most recent call last):
  File "site-packages/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "mwe.py", line 25, in fill_data
    run['data'] = yield from get(url)
  File "mwe.py", line 17, in get
    raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'

那么,正确处理由协程引发的异常的方法是什么?

2个回答

89

asyncio.wait并不会真正消耗传递给它的Futures,它只是等待它们完成,然后返回Future对象:

协程 asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

等待由序列futures指定的Futures和协程对象完成。协程将被包装在任务中。返回两组Future: (done, pending)。

在实际使用中,直到您从done列表中yield from/await这些项目之一,它们才会保持未使用状态。由于程序退出而没有消费这些Futures,所以会看到“未检索到异常”的消息。

对于您的用例,可能更合理的做法是使用asyncio.gather,它将实际消耗每个Future,然后返回一个聚合所有结果的单个Future(或引发输入列表中第一个Future抛出的Exception)。

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*[fill_data(r) for r in runs])
    loop.run_until_complete(tasks)
    return runs

输出:

GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)

请注意,asyncio.gather 实际上允许您在其中一个 future 抛出异常时自定义其行为。默认行为是抛出第一个遇到的异常,但它也可以只返回输出列表中的每个异常对象:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

返回一个将协程对象或 future 聚合结果的 future。

所有 futures 必须共享同一个事件循环。如果所有任务都成功完成,则返回的 future 的结果是结果列表(按照原始序列的顺序,而不一定是结果到达的顺序)。如果 return_exceptionsTrue,则任务中的异常会被视为成功的结果,并在结果列表中汇总;否则,第一个引发的异常将立即传播到返回的 future。


2
感谢您的解释,文档对于异常处理并不十分清晰。 - Yury Bayda
1
那么如何使用wait来实现呢?是像这样yield from asyncio.wait(...)吗?await asyncio.wait(...)也可以吗? - z0r
顺便提一下,如果协程中的一个从未实际启动,也会出现同样的问题。我用两个协程打开一些套接字(手动),并尝试 await <loop>.sock_recv(<socket>, <size>)。如果套接字未切换到非阻塞模式(使用 <socket>.setblocking()),第二个协程将不会启动,而 KeyboardInterrupt 将导致 "Task exception was never retrieved"。 - doak
在异步函数内部,不能使用 yield from 来解决 future 或 task。应该使用 await。这对我来说不是很清楚。 - Rugnar
1
感谢提供 return_exceptions 提示。由于进行了数百个头请求,我不希望因其中一个超时而抛出异常。 - pouya

6

要调试或“处理” 回调函数中的异常:

返回某些结果或引发异常的协程:

@asyncio.coroutine
def async_something_entry_point(self):
    try:
        return self.real_stuff_which_throw_exceptions()
    except:
        raise Exception(some_identifier_here + ' ' + traceback.format_exc())

并回调:

def callback(self, future: asyncio.Future):
    exc = future.exception()
    if exc:
        # Handle wonderful empty TimeoutError exception
        if type(exc) == TimeoutError:
            self.logger('<Some id here> callback exception TimeoutError')
        else:
            self.logger("<Some id here> callback exception " + str(exc))

    # store your result where you want
    self.result.append(
        future.result()
    )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接