异步IO网络爬虫101:使用aiohttp获取多个URL

26

在早些时候的问题中,aiohttp 的一位作者提出了一种使用 Python 3.5 中的新 async with 语法来获取多个 URL 的方法。

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.wait([loop.create_task(fetch(session, url))
                                  for url in urls])
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
            'http://google.com',
            'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))
        # do something with the the_results

然而,当session.get(url)请求之一失败时(如上述因http://SDFKHSKHGKLHSKLJHGSDFKSJH.com),错误未被处理,整个程序就会崩溃。

我寻找了关于session.get(url)结果插入测试的方法,例如寻找一个 try ... except ... 的地方,或者一个 if response.status!= 200:的地方,但我只是不明白如何使用async withawait和各种对象。

由于async with仍然非常新,因此没有很多示例。如果一个asyncio专家能够展示如何做到这一点,那将对许多人非常有帮助。毕竟,大多数人想要使用asyncio测试的第一件事情之一就是同时获取多个资源。

目标

目标是我们可以检查the_results并快速地查看以下内容:

  • 此URL失败了(以及原因:状态代码,也许是异常名称);或者
  • 此URL有效,并且这里有一个有用的响应对象。
2个回答

26

我建议使用 gather 来替代 wait,因为它可以返回异常对象而不会抛出异常。这样你就可以检查每个结果是否是某个异常的实例。

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.gather(
        *[fetch(session, url) for url in urls],
        return_exceptions=True  # default is false, that would raise
    )

    # for testing purposes only
    # gather returns results in the order of coros
    for idx, url in enumerate(urls):
        print('{}: {}'.format(url, 'ERR' if isinstance(results[idx], Exception) else 'OK'))
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = [
        'http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
        'http://google.com',
        'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))

测试:

$python test.py 
http://SDFKHSKHGKLHSKLJHGSDFKSJH.com: ERR
http://google.com: OK
http://twitter.com: OK

太棒了,非常感谢!我需要消化一下这个信息,但在稍微尝试一下后,它似乎非常灵活。+1,接受。:) - Hans Schindler
1
很棒的答案。我有一个疑问,既然你在执行asyncio.gather后立即迭代结果,那么在fetch列表上使用asyncio.as_completed是否更好呢?这样你就可以立即迭代已完成的任务,而不必等待所有任务都完成。 - dalanmiller
@dalanmiller:它需要异常处理,就像Padraic Cunningham的回答一样。但是如果您需要立即获得每个Future的结果,那么这就是方法。 - kwarunek

9

我不是asyncio的专家,但如果你想捕获错误,你需要捕获一个socket错误:

async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                print(response.status == 200)
                return await response.text()
        except socket.error as e:
            print(e.strerror)

运行代码并打印the_results:
Cannot connect to host sdfkhskhgklhskljhgsdfksjh.com:80 ssl:False [Can not connect to sdfkhskhgklhskljhgsdfksjh.com:80 [Name or service not known]]
True
True
({<Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result='<!DOCTYPE ht...y>\n</html>\n'>, <Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result=None>, <Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result='<!doctype ht.../body></html>'>}, set())

你可以看到我们捕获了错误,而且后续的调用仍然成功返回HTML。
我们应该真正地捕获一个OSError,因为自Python 3.3以来,socket.error已被弃用为OSError的别名
async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                return await response.text()
        except OSError as e:
            print(e)

如果您想检查响应是否为200,也可以将if语句放在try中,并且您可以使用reason属性获取更多信息:
async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                if response.status != 200:
                    print(response.reason)
                return await response.text()
        except OSError as e:
            print(e.strerror)

非常感谢!两个很棒的答案,我希望我能都选上。选择@kwarunek的,因为它可以直接使用,但+1,我会去找你们最好的两个答案来点赞。 :) - Hans Schindler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接