如何在Python中限制对Web服务的请求速率?

26

我正在开发一个与Web服务API交互的Python库。和许多我遇到的Web服务一样,这个请求限制请求速率。我想提供一个可选参数limit来实例化类,如果提供了这个参数,将会等待指定秒数的时间后再发送请求。

我理解一般的情况是:类的一个实例通过方法发出请求。当它这样做时,该方法发出某个信号,在某个地方设置一个锁变量,并开始倒计时limit中指定的秒数。(很可能,锁就是倒计时器本身。)如果在这个时间范围内进行了另一个请求,则必须排队等待,直到倒计时器归零并且锁被解除;此时,队列中最旧的请求将被发送,倒计时器将被重置并重新启用锁。

这是否需要使用线程?还有其他方法吗?

倒计时器和锁应该是实例变量,还是属于类,以便所有类的实例都持有请求?

此外,在库中提供速率限制功能通常是一个坏主意吗?我认为,由于默认情况下,倒计时为零秒,所以库仍然允许开发人员使用库并提供自己的速率限制方案。然而,考虑到使用该服务的任何开发人员都需要对请求进行速率限制,我认为提供速率限制的方法会很方便。

无论是否在库中放置速率限制方案,我都想编写一个使用该库的应用程序,因此建议的技术将会派上用场。

7个回答

34

除非必要,不要重复发明轮子。查看超棒的库ratelimit。如果你只想限制调用rest api的速率以及处理其他事情,使用它是完美的选择。

from datetime import timedelta
from ratelimit import limits, sleep_and_retry
import requests

@sleep_and_retry
@limits(calls=1, period=timedelta(seconds=60).total_seconds())
def get_foobar():
    response = requests.get('https://httpbin.org/get')
    response.raise_for_status()
    return response.json()

如果每分钟发出的请求超过一个,这将阻塞线程。


非常感谢您,@vidstige。我一直在苦苦思索如何实现每分钟60次的速率限制解决方案。 - call-in-co

13

使用队列和调度器可以更好地解决这个问题。

您将处理过程分为两个方面:调度。它们可以是不同的线程(如果更容易,则可以是不同的进程)。

侧以满足它们的愿望的任何速率创建并排队请求。

调度侧执行以下操作。

  1. 获取请求开始时间,s

  2. 取消排队的请求,通过远程服务处理请求。

  3. 获取当前时间,t。 睡眠 速率 - (t - s) 秒。

如果要直接将侧连接到远程服务运行,则可以绕过速率限制。 这对于使用远程服务的模拟版本进行内部测试非常有用。

这个困难在于为每个请求创建一些表示形式,您可以将其加入队列。 由于 Python的Queue 几乎可以处理任何内容,因此您不必做太多工作。

如果使用多处理,则必须pickle您的对象以将它们放入管道中。


2
排队可能过于复杂。一个更简单的解决方案是给你的类一个变量,用于记录服务上次调用的时间。每当服务被调用时(!1),将waitTime设置为delay - Now + lastcalltime。其中delay应等于请求之间允许的最小时间。如果这个数字是正数,在发起调用之前就要睡眠相应的时间(!2)。这种方法的缺点/优点是它将Web服务请求视为同步请求。优点是它非常简单易实现。
  • (!1):在包装器内(可能在包装器底部)收到服务响应后立即发生。
  • (!2):在Python封装Web服务时调用时,在包装器顶部发生。
S.Lott的解决方案当然更优雅。

2

使用time.sleep()在请求之间添加2秒的暂停,如下所示:

import time
import requests

for i in range(10):
    requests.get('http://example.com')
    time.sleep(2)

错误的假设。它等待2秒钟。这将导致一个结束和另一个开始之间有2秒的时间间隔。通常情况下,您希望在一个开始和另一个开始之间有2秒的时间间隔。 - S.Lott
等待一个请求结束和另一个请求开始之间的2秒可能更安全,如果限制是基于实际调用时间完成的。真正的问题是你的解决方案在请求之间等待了超过2秒的时间,因为请求之间的计算可能需要时间。 - Brian

1

你的速率限制方案应该受到底层代码调用约定(同步或异步)以及此速率限制将在哪个范围(线程、进程、机器、集群?)上运行的影响。

我建议将所有变量保留在实例内部,这样您就可以轻松地实现多个控制周期/速率。

最后,听起来你想成为一个中间件组件。不要试图成为一个应用程序并自己引入线程。如果你是同步的,只需阻塞/休眠,如果你被其中之一调用,请使用异步调度框架。


1

如果您的库是同步设计的,那么我建议不要包含限制执行(尽管您可以跟踪速率并帮助调用者决定如何遵守限制)。

我现在使用twisted与几乎所有东西进行接口交互。通过拥有一个将请求提交与响应处理分离的模型,它使得这种类型的事情变得容易。如果您不想让API用户使用twisted,您至少应该了解他们的延迟执行API。

例如,我有一个Twitter接口,代表xmpp用户推送了相当荒谬数量的请求。我没有速率限制,但我必须做一些工作来防止所有请求同时发生。


1
yfinance包的文档中,他们展示了一种很好且简洁的方式来同时进行速率限制和响应缓存。因为在开发和调试过程中,我经常会重复执行相同的请求。
from requests import Session
from requests_cache import CacheMixin, SQLiteCache
from requests_ratelimiter import LimiterMixin, MemoryQueueBucket
from pyrate_limiter import Duration, RequestRate, Limiter

class CachedLimiterSession(CacheMixin, LimiterMixin, Session)

session = CachedLimiterSession(
    limiter=Limiter(RequestRate(2, Duration.SECOND*5)),  # max 2 requests per 5 seconds
    bucket_class=MemoryQueueBucket,
    backend=SQLiteCache("yfinance.cache"),
)
response = requests.get('https://httpbin.org/get')
response.raise_for_status()
response.json()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接