aiohttp:限制并发请求速率

36

API通常有速率限制,用户必须遵循。例如,每秒50个请求。连续请求需要0.5-1秒钟的时间,因此太慢而无法接近该限制。然而,使用aiohttp进行并行请求会超过速率限制。

为了以允许的最快速度轮询API,需要对并发调用进行速率限制。

到目前为止我找到的例子大致如下所示:session.get

session.get = rate_limited(max_calls_per_second)(session.get)

这对于顺序调用非常有效。但是尝试在并行调用中实现此操作并不能按预期工作。

以下为示例代码:

async with aiohttp.ClientSession() as session:
    session.get = rate_limited(max_calls_per_second)(session.get)
    tasks = (asyncio.ensure_future(download_coroutine(  
          timeout, session, url)) for url in urls)
    process_responses_function(await asyncio.gather(*tasks))

这样做的问题在于它将限制任务排队的速度。使用gather执行仍然会在更或多或少同时发生。两全其美?;-)。

是的,我在这里找到了一个类似的问题,但是没有回答实际限制请求速率的问题。此外,Quentin Pradet的博客文章只能在限制任务排队方面起作用。

总之:如何限制并行aiohttp请求的每秒请求数?


相关问题可以在 https://dev59.com/kFsW5IYBdhLWcg3wQVX2#35198369 找到。 - Mihai8
5
是的,就像我在帖子中提到的那样。不幸的是,那个问题没有得到正确回答。回复中谈论了限制同时请求的数量。被问到的是每秒请求数量。 - Boffin
5个回答

25

如果我理解得正确,您希望限制同时发出的请求数量?

asyncio 中有一个名为 Semaphore 的对象,它的工作方式类似于异步的 RLock

semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
    async with semaphore:
        # do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])

更新

假设我同时发起50个请求, 它们在2秒内全部完成。这意味着它不会触及限制(每秒仅允许25个请求)。

那么,我应该发起100个并发请求,它们也都在2秒内完成(每秒50个请求)。但在实际发出这些请求之前,你怎么确定它们将需要多长时间才能完成呢?

或者,如果您不介意“每秒完成的请求数”,而是关心“每秒发出的请求数”,则可以执行以下操作:

async def loop_wrap(urls):
    for url in urls:
        asyncio.ensure_future(download(url))
        await asyncio.sleep(1/50)

asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()

以上代码将每1/50秒创建一个Future实例。


6
不,这是关于限制每秒请求的数量,即每秒发送请求的次数。同时请求的数量取决于这些请求所需的时间,但我们想要使用的 API 没有对此进行限制。 - Boffin
@Boffin 这是不可能的。首先你需要发出那些请求,然后才能得知它们所需的时间。你想要的是预测。例如,我首先发出50个并发请求,如何确定它们是否会在一秒钟内完成? - Sraw
1
请求所需的时间长度并不重要。我想在上一个请求之后的1/50秒内启动另一个请求。请求需要(假设)1秒钟,因此必须同时发送多个请求。然而,同时运行多少个请求并不重要。也许我在这里有什么误解? - Boffin
@Boffin 已更新,希望有所帮助。 - Sraw
更新后的代码似乎等价于 for url in urls: await asyncio.sleep(1/50); await download_coroutine(url) - user4815162342
显示剩余4条评论

19

我通过创建的子类,并基于漏桶算法添加速率限制器来解决这个问题。与使用Semaphores不同,我使用asyncio.Queue()进行速率限制。我只覆盖了_request()方法。我认为这种方法更加简洁,因为你只需要将session = aiohttp.ClientSession()替换为session = ThrottledClientSession(rate_limit=15)即可。

class ThrottledClientSession(aiohttp.ClientSession):
    """
    Rate-throttled client session class inherited from aiohttp.ClientSession)

    USAGE:
        replace `session = aiohttp.ClientSession()`
        with `session = ThrottledClientSession(rate_limit=15)`

    see https://dev59.com/aVYM5IYBdhLWcg3wjAuw#60357775
    """

    MIN_SLEEP = 0.1

    def __init__(self, rate_limit: float = None, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.rate_limit = rate_limit
        self._fillerTask = None
        self._queue = None
        self._start_time = time.time()
        if rate_limit is not None:
            if rate_limit <= 0:
                raise ValueError('rate_limit must be positive')
            self._queue = asyncio.Queue(min(2, int(rate_limit) + 1))
            self._fillerTask = asyncio.create_task(self._filler(rate_limit))

    def _get_sleep(self) -> Optional[float]:
        if self.rate_limit is not None:
            return max(1 / self.rate_limit, self.MIN_SLEEP)
        return None

    async def close(self) -> None:
        """Close rate-limiter's "bucket filler" task"""
        if self._fillerTask is not None:
            self._fillerTask.cancel()
        try:
            await asyncio.wait_for(self._fillerTask, timeout=0.5)
        except asyncio.TimeoutError as err:
            print(str(err))
        await super().close()

    async def _filler(self, rate_limit: float = 1):
        """Filler task to fill the leaky bucket algo"""
        try:
            if self._queue is None:
                return
            self.rate_limit = rate_limit
            sleep = self._get_sleep()
            updated_at = time.monotonic()
            fraction = 0
            extra_increment = 0
            for i in range(0, self._queue.maxsize):
                self._queue.put_nowait(i)
            while True:
                if not self._queue.full():
                    now = time.monotonic()
                    increment = rate_limit * (now - updated_at)
                    fraction += increment % 1
                    extra_increment = fraction // 1
                    items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
                    fraction = fraction % 1
                    for i in range(0, items_2_add):
                        self._queue.put_nowait(i)
                    updated_at = now
                await asyncio.sleep(sleep)
        except asyncio.CancelledError:
            print('Cancelled')
        except Exception as err:
            print(str(err))

    async def _allow(self) -> None:
        if self._queue is not None:
            # debug
            # if self._start_time == None:
            #    self._start_time = time.time()
            await self._queue.get()
            self._queue.task_done()
        return None

    async def _request(self, *args, **kwargs)  -> aiohttp.ClientResponse:
        """Throttled _request()"""
        await self._allow()
        return await super()._request(*args, **kwargs)


如何使这个子类可访问?当我将它放入我的主要脚本中时,出现了错误:AttributeError: module 'aiohttp' has no attribute 'ThrottledClientSession' - Matthew J. Oldach
嗨,新的子类不会成为aiohttp包的一部分。您可以将该类添加到同一源文件中,或者导入它:'从类源文件的文件名导入ThrottledClientSession'。 - Jylpah
1
你可以在这里找到最新版本:https://github.com/Jylpah/blitz-tools/blob/master/blitzutils.py - Jylpah
2
有人同意这个解决方案并编写了一个包 https://aiolimiter.readthedocs.io/en/latest/ - hoj201
2
@thatrandomperson,如果你指的是我的 ThrottledClientSession(),我已经将它移动到一个新的仓库 https://github.com/Jylpah/pyutils。 请参阅:https://github.com/Jylpah/pyutils/blob/main/throttledclientsession.py - Jylpah
显示剩余2条评论

4

我喜欢@sraw使用asyncio的方法,但他们的回答对我来说不太合适。由于我不知道我的下载调用是否会比期望的速率限制更快或更慢,因此我想有一个选择,在请求很慢时并行运行多个请求,在请求非常快时一次只运行一个请求,以便我始终保持在速率限制之内。

我通过使用一个队列和一个生产者以速率限制产生新任务,然后许多消费者将等待下一个作业(如果它们很快),或者如果它们很慢,则队列中会有积压作业,并且将以处理器/网络允许的速度运行:

import asyncio
from datetime import datetime 

async def download(url):
  # download or whatever
  task_time = 1/10
  await asyncio.sleep(task_time)
  result = datetime.now()
  return result, url

async def producer_fn(queue, urls, max_per_second):
  for url in urls:
    await queue.put(url)
    await asyncio.sleep(1/max_per_second)
 
async def consumer(work_queue, result_queue):
  while True:
    url = await work_queue.get()
    result = await download(url)
    work_queue.task_done()
    await result_queue.put(result)

urls = range(20)
async def main():
  work_queue = asyncio.Queue()
  result_queue = asyncio.Queue()

  num_consumer_tasks = 10
  max_per_second = 5
  consumers = [asyncio.create_task(consumer(work_queue, result_queue))
               for _ in range(num_consumer_tasks)]    
  producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second))
  await producer

  # wait for the remaining tasks to be processed
  await work_queue.join()
  # cancel the consumers, which are now idle
  for c in consumers:
    c.cancel()

  while not result_queue.empty():
    result, url = await result_queue.get()
    print(f'{url} finished at {result}')
 
asyncio.run(main())

2
我开发了一个名为octopus-api的库(https://pypi.org/project/octopus-api/),它使用aiohttp并在底层进行速率限制和设置连接数(并行)调用端点。它的目标是简化所有需要的aiohttp设置。
以下是如何使用它的示例,其中get_ethereum是用户定义的请求函数:
from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List

if __name__ == '__main__':
    async def get_ethereum(session: TentacleSession, request: Dict):
        async with session.get(url=request["url"], params=request["params"]) as response:
            body = await response.json()
            return body

    client = OctopusApi(rate=50, resolution="sec", connections=6)
    result: List = client.execute(requests_list=[{
        "url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
        "params": {}}] * 1000, func=get_ethereum)
    print(result)
与的POST、GET、PUT和PATCH方法使用方式相同,可帮助您解决与速率限制和并行调用相关的问题。


在Github上零星评价和1个Stack点,对于看起来很有用的库来说非常奇怪... - AntonOfTheWoods

0

关于在调用gather()时同时发送n个请求的问题,关键是在每次调用之前使用create_task()和await asyncio.sleep(1.1)。使用create_task创建的任何任务都会立即运行:

    for i in range(THREADS):
        await asyncio.sleep(1.1)
        tasks.append(
            asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
        )
    await asyncio.gather(*tasks) 

另一个限制同时连接数量的问题在下面的例子中也通过在async_payload_wrapper中使用ClientSession()上下文并设置具有限制的连接器来解决。
通过这种设置,我可以运行25个协程(THREADS=25),每个循环遍历一个URL队列,而不违反25个并发连接规则。
async def send_request(session, url, routine):
    start_time = time.time()
    print(f"{routine}, sending request: {datetime.now()}")
    params = {
                'api_key': 'nunya',
                'url': '%s' % url, 
                'render_js': 'false',
                'premium_proxy': 'false', 
                'country_code':'us'
            }
    try:
        async with session.get(url='http://yourAPI.com',params=params,) as response:              
            data = await response.content.read()                     
            print(f"{routine}, done request: {time.time() - start_time} seconds")                    
        return data

    except asyncio.TimeoutError as e:    
        print('timeout---------------------')  
        errors.append(url)
    except aiohttp.ClientResponseError as e:
        print('request failed - Server Error')
        errors.append(url)
    except Exception as e:
        errors.append(url)

async def getData(session, q, test):
    while True:
        if not q.empty():
            url = q.get_nowait()
            resp = await send_request(session, url ,test)                      
            if resp is not None:
                processData(resp, test, url)
        else:
            print(f'{test} queue empty')
            break

async def async_payload_wrapper():
    tasks = []
    q = asyncio.Queue()
    for url in urls:
        await q.put(url)  


    async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session:    

        for i in range(THREADS):
            await asyncio.sleep(1.1)
            tasks.append(
                asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
            )
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(async_payload_wrapper())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接