使用Python requests进行异步请求

227

我尝试了Python的requests库文档中提供的示例。

使用async.map(rs),我可以获取响应代码,但我想要获取每个请求页面的内容。例如,下面的方式不起作用:

out = async.map(rs)
print out[0].content

也许你收到的响应体是空的? - Mariusz Jamro
可以,但请把你收到的完整错误信息发布出来。 - Chewie
没有错误,只是通过提供的测试URL无限运行。 - trbck
当我使用https的url时,它显然会出现问题。http却可以正常工作。 - trbck
10
大多数答案已经过时了。在2021年,当前的潮流效应赢家是:https://docs.aiohttp.org/en/stable/ - guettli
显示剩余2条评论
16个回答

191

注意

以下的答案不适用于请求库v0.13.0及以上版本。该异步功能已经被移至grequests,因此该问题被提出后。但是,您可以将下面的requests替换为grequests,然后就可以正常工作了。

我将保留此答案,以反映最初的问题,即如何使用请求库小于v0.13.0。


要使用async.map 异步地执行多个任务,您需要:

  1. 定义一个函数来处理您想要对每个对象执行的任务
  2. 将该函数作为事件钩子添加到您的请求中
  3. 调用async.map以处理所有请求/操作的列表

示例:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

2
很好的想法留下你的评论:由于最新请求和grequests之间的兼容性问题(在请求1.1.0中缺少max_retries选项),我不得不降级请求以检索异步,并且我发现异步功能已经移动到版本0.13+(https://pypi.python.org/pypi/requests)。 - outforawhile
12
from grequests import async 无效,而这个函数定义 def do_something(response, **kwargs): 对我有用。我是从 https://dev59.com/rHDXa4cB1Zd3GeqP7w3A 找到的。 - Allan Ruin
4
如果异步地调用async.map仍然会阻塞,那么这怎么是异步的呢?除了请求本身被异步发送之外,检索仍然是同步的吗? - bryanph
5
ه°†from requests import asyncو›؟وچ¢ن¸؛import grequests as asyncه¯¹وˆ‘وœ‰و•ˆم€‚ - Martin Thoma
4
grequests现在建议使用requests-threadsrequests-futures - OrangeDog
显示剩余5条评论

106

async现在是一个独立模块:grequests

请参见这里:https://github.com/spyoungtech/grequests

还有这里:Ideal method for sending multiple HTTP requests over Python?

安装:

$ pip install grequests

用法:

构建一个堆栈:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

发送堆栈
grequests.map(rs)

结果看起来像

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests似乎没有设置并发请求的限制,即当多个请求发送到同一台服务器时。

11
关于并发请求限制,您可以在运行map()/imap()时指定池的大小。例如:grequests.map(rs, size=20) 可以同时处理20个请求。 - synthesizerpatel
3
目前该软件无法在Python3上运行(gevent在Python 3.4上构建v2.6失败)。 - saarp
1
我不太理解async的部分。如果我让results = grequests.map(rs),那么这行代码之后的代码就会被阻塞,我能看到异步效果吗? - Allan Ruin
3
在 GitHub 的 repo 中,grequests 的作者建议使用 requests-threads 或 requests-futures 代替。请注意,我的翻译尽可能保留原意,同时使内容更加通俗易懂,并未添加任何解释或额外信息。 - theberzi
1
有人知道响应列表是按照响应时间排序还是按照我们在map函数中传递的URL列表的相同顺序排序吗? - luisvenezian
有人知道响应列表是按响应时间排序还是按我们在map函数中传递的url列表的相同顺序排序吗? - luisvenezian

60

我测试了两个库:requests-futuresgrequests。grequests更快,但会带来猴子补丁和额外的依赖问题;requests-futures比grequests慢好几倍。于是我决定自己写代码,只需将requests包装到ThreadPoolExecutor即可,速度几乎与grequests一样,而且没有外部依赖。

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

谢谢,但我不明白为什么你要为请求创建一个超时时间? - Slow Harry
2
抱歉,我不理解你的问题。在多个线程中仅使用单个URL?仅适用于一种情况下的DDoS攻击)) - Hodza
27
我不理解为什么这个答案会有这么多赞。提问者的问题是关于异步请求的。ThreadPoolExecutor可以运行线程。是的,你可以在多个线程中发出请求,但那永远不会成为一个异步程序,所以我怎么能把它作为原始问题的答案呢? - nagylzs
2
实际上,这个问题是关于如何并行加载URL的。是的,线程池执行器不是最好的选择,最好使用异步IO,但在Python中它也能很好地工作。我不明白为什么线程不能用于异步?如果您需要异步运行CPU绑定任务怎么办? - Hodza
1
我更喜欢使用事件循环而不是线程池。 - iedmrc
显示剩余7条评论

55

很不幸,据我所知,requests库并不支持执行异步请求。您可以在requests周围包装async/await语法,但这仍然会使底层请求变得同步。如果您想要真正的异步请求,必须使用其他提供此功能的工具。其中之一是aiohttp(Python 3.5.3+)。我使用Python 3.7的async/await语法时,它表现良好。以下是使用以下三种方法执行 n 次网络请求:

  1. 纯同步请求(sync_requests_get_all),使用Python requests
  2. 使用Python 3.7的async/await语法和asyncio包装的同步请求(async_requests_get_all),使用Python requests
  3. 使用Python 3.7的async/await语法和asyncio包装的真正异步实现(async_aiohttp_get_all),使用Python aiohttp
"""
Tested in Python 3.5.10
"""

import time
import asyncio
import requests
import aiohttp

from asgiref import sync

def timed(func):
    """
    records approximate durations of function calls
    """
    def wrapper(*args, **kwargs):
        start = time.time()
        print('{name:<30} started'.format(name=func.__name__))
        result = func(*args, **kwargs)
        duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
            name=func.__name__, elapsed=time.time() - start
        )
        print(duration)
        timed.durations.append(duration)
        return result
    return wrapper

timed.durations = []


@timed
def sync_requests_get_all(urls):
    """
    performs synchronous get requests
    """
    # use session to reduce network overhead
    session = requests.Session()
    return [session.get(url).json() for url in urls]


@timed
def async_requests_get_all(urls):
    """
    asynchronous wrapper around synchronous requests
    """
    session = requests.Session()
    # wrap requests.get into an async function
    def get(url):
        return session.get(url).json()
    async_get = sync.sync_to_async(get)

    async def get_all(urls):
        return await asyncio.gather(*[
            async_get(url) for url in urls
        ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)

@timed
def async_aiohttp_get_all(urls):
    """
    performs asynchronous get requests
    """
    async def get_all(urls):
        async with aiohttp.ClientSession() as session:
            async def fetch(url):
                async with session.get(url) as response:
                    return await response.json()
            return await asyncio.gather(*[
                fetch(url) for url in urls
            ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)


if __name__ == '__main__':
    # this endpoint takes ~3 seconds to respond,
    # so a purely synchronous implementation should take
    # little more than 30 seconds and a purely asynchronous
    # implementation should take little more than 3 seconds.
    urls = ['https://postman-echo.com/delay/3']*10

    async_aiohttp_get_all(urls)
    async_requests_get_all(urls)
    sync_requests_get_all(urls)
    print('----------------------')
    [print(duration) for duration in timed.durations]
在我的电脑上,这是输出结果:
async_aiohttp_get_all          started
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         started
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          started
sync_requests_get_all          finished in 30.59 seconds
----------------------
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          finished in 30.59 seconds

"asnyc" 是一个打字错误还是故意的? - guettli
4
肯定是打字错误。 - DragonBobZ
你的 async_aiohttp_get_all() 是一个不错的解决方案。我也想到了类似的东西,但是在它外面多了一个 async def fetch_all(urls): return await asyncio.gather(*[fetch(url) for url in urls]),这导致我的解决方案为每个URL创建单独的 aiohttp.ClientSession() 实例,而通过嵌入本地函数,您可以重用相同的会话……在我看来更具有 Python 风格。您能否提醒我使用 sync.async_to_sync() 与存在 get_all() 相比,使用 asyncio.run() without get_all() 的好处是什么? - wescpy
做得非常棒,肯定是async_aiohttp比所有其他工具都表现更好! - Tuhin Mitra
这个纯aiohttp版本是不是只有我一个人觉得它使用了asgiref.sync.async_to_sync来运行?有没有一种方法可以在不包含额外模块的情况下完成这个操作? - CpILL
2
@CpILL 它包装了一个返回协程的函数(即 asyncio.gather 的结果),以便可以在同步上下文中调用它。我喜欢这样做。你也可以使用例如 asyncio.run 直接执行 asyncio.gather 的结果。 - DragonBobZ

35
也许requests-futures是另一个选择。
from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

官方文档中也推荐使用此方法。如果您不想使用gevent,这是一个好的选择。


1
最简单的解决方案之一。可以通过定义max_workers参数来增加并发请求的数量。 - Jose Cherian
1
能够看到一个例子进行缩放,这样我们就不需要为每个要循环的项使用一个变量名了,那会很好。 - user1717828
每个请求都有一个线程是一种极大的资源浪费!同时进行500个请求是不可能的,它会占用你的CPU。这绝不应被视为一个好的解决方案。 - Corneliu Maftuleac
@CorneliuMaftuleac 很好的观点。关于线程使用,你肯定需要关心它,而该库提供了启用线程池或处理池的选项。ThreadPoolExecutor(max_workers=10) - Dreampuf
@Dreampuf的进程池我认为更糟糕? - Corneliu Maftuleac
显示剩余2条评论

18

您可以使用httpx来实现。

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果你想要一个功能性语法,gamla 库可以将其封装到 get_async 中。

然后你就可以这样做:


await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])

10是超时时间,单位为秒。

(免责声明:我是作者)


还有 respx 用于模拟/测试 :) - rlat
嗨@Uri,我在尝试你在这个答案中提到的代码时遇到了以下错误。 await asyncio.gather(*map(get_async, urls)) ^ SyntaxError: invalid syntax 请指导。 - AJ.
1
请注意,您需要一个异步上下文才能使用await - Uri

16

我对大多数发布的答案有很多问题 - 它们要么使用已经被移植过来并带有有限功能的废弃库,要么提供了在请求执行过程中具有太多魔法的解决方案,使得错误处理变得困难。如果它们不属于以上任一类别,则是第三方库或已过时的。

其中一些解决方案仅适用于HTTP请求,在任何其他类型的请求上都表现不佳,这是荒唐的。这里不需要高度定制的解决方案。

仅使用Python内置库asyncio就足以执行任何类型的异步请求,并为复杂的和具体用例特定的错误处理提供足够的流动性。

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

它的工作原理很简单。您正在创建一系列希望异步发生的任务,然后要求循环执行这些任务,并在完成后退出。没有额外的库需要维护,也没有必要缺乏功能。


11
如果我理解正确的话,这将在执行GRPC和HTTP调用时阻塞事件循环?因此,如果这些调用需要几秒钟才能完成,你的整个事件循环将会被阻塞几秒钟?为了避免这种情况,你需要使用支持异步操作的GRPC或HTTP库。然后,你可以使用类似于await response = requests.get(URL)的代码。是这样吗? - Coder Nr 23
1
不幸的是,当我尝试这个方法时,我发现对requests进行包装几乎比同步调用URL列表要慢(在某些情况下甚至更慢)。例如,使用上述策略请求一个响应时间为3秒的端点10次大约需要30秒。如果您想要真正的异步性能,您需要使用像aiohttp这样的东西。 - DragonBobZ
1
@arshbot 如果你的任务是异步的,那么尽管等待synchronous calls中的requests.get,你仍然会看到加速。但问题是如何使用Python requests库执行异步请求。这个答案没有解决这个问题,所以我的批评依旧。 - DragonBobZ
1
我认为应该进行提升。使用异步事件循环似乎足以触发异步请求,无需安装外部依赖。 - iedmrc
1
@iedmrc,遗憾的是,情况并非如此。要使任务非阻塞,必须使用Python中较新的异步工具来实现,而请求库并不支持这种方式。如果您只是将请求任务放入异步事件循环中,它们仍然会被阻塞。话虽如此,您可以(如其他回答所建议的)使用gevent或线程与请求一起使用,但绝对不能使用asyncio。 - Sergio Chumacero
显示剩余2条评论

7

4
确认,效果很棒。在项目页面上,它说这个工作已被下面的项目所取代 https://github.com/encode/httpx - nurettin

7

我知道这篇文章已经关闭了一段时间,但是我认为推广另一个基于requests库构建的异步解决方案可能会很有用。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

文档在这里:http://pythonhosted.org/simple-requests/

@YSY 欢迎随时在 https://github.com/ctheiss/simple-requests/issues 上发布问题;我每天都会使用这个库数千次。 - Monkey Boson
波士顿,你如何处理404/500错误?HTTPS网址怎么办?能否提供支持数千个网址的代码片段?请贴一个例子好吗?谢谢。 - YSY
@YSY 默认情况下,404/500错误会引发异常。这种行为可以被覆盖(参见http://pythonhosted.org/simple-requests/#simple_requests.ResponsePreprocessor)。由于依赖于gevent,HTTPS urls很棘手,而当前存在一个未解决的bug(https://github.com/gevent/gevent/issues/477)。在票据中有一个shim可以运行,但它仍然会针对SNI服务器抛出警告(但它*将*工作)。至于剪切,恐怕我所有的用法都在我的公司并且不公开。但是我向您保证,我们在数十个任务中执行了数千个请求。 - Monkey Boson
1
库在交互方面看起来很流畅。Python3+可用吗?抱歉没有看到任何提及。 - Isaac Philip
@Jethro 绝对正确,由于 Python 3 的底层技术非常不同,因此该库需要进行彻底的重写。目前,该库“完成”,但仅适用于 Python 2。 - Monkey Boson

4

免责声明:以下代码为每个函数创建不同的线程

这对于某些情况可能很有用,因为它更简单易用。但请注意,它不是异步的,而是使用多个线程给出异步的假象,尽管装饰器表明了这一点。

您可以使用以下装饰器,在函数执行完成后提供回调,回调必须处理函数返回的数据的处理。

请注意,函数装饰后将返回一个Future对象。

import asyncio

## Decorator implementation of async runner !!
def run_async(callback, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def inner(func):
        def wrapper(*args, **kwargs):
            def __exec():
                out = func(*args, **kwargs)
                callback(out)
                return out

            return loop.run_in_executor(None, __exec)

        return wrapper

    return inner

实现示例:

urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = []  # OPTIONAL, used for showing realtime, which urls are loaded !!


def _callback(resp):
    print(resp.url)
    print(resp)
    loaded_urls.append((resp.url, resp))  # OPTIONAL, used for showing realtime, which urls are loaded !!


# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
    return requests.get(url)


for url in urls:
    get(url)

如果您想实时查看加载的URL,则可以在末尾添加以下代码:
while True:
    print(loaded_urls)
    if len(loaded_urls) == len(urls):
        break

1
这个可以工作,但它为每个请求生成一个新的线程,这似乎违背了使用asyncio的目的。 - rtaft
@rtaft 感谢您的建议,我已经更正了我的措辞。 - vaskrneup

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接