在GRequests中限制/限速HTTP请求的速率

28
我正在使用Python 2.7.3和lxml编写一个小脚本,使用GRequests从不同的网站收集一些可收集卡片的价格,并进行比较。问题是其中一个网站限制了请求次数,如果我超过限制,它会发送HTTP错误429。
有没有办法在GRequests中添加限制请求数量的功能,以便我不会超过我指定的每秒请求数? 此外 - 如果出现HTTP 429,如何让GRequests在一段时间后重试?
另外一件事 - 他们的限制非常低。像每15秒只能发起8个请求这样的限制。我多次通过刷新页面等待价格变动来追踪信息,已经超过了他们的限制。

你是否正在使用grequests注册多个URL到他们的网站?如果是这样,最好同步进行他们网站的请求,这样您就可以轻松地控制重试。 - dm03514
我同意@dm03514的观点 - 你不需要使用grequests。最近我读到了一种针对这种问题进行优化的算法。 - Wayne Werner
我正在尝试同时访问同一网站的多个URL,因为我想能够同时监控多张卡的价格。 - Bartłomiej Siwek
@WayneWerner 那听起来很有趣。你介意分享一下你的同步方法吗? - Bartłomiej Siwek
5个回答

30

由于我不得不自己解决问题,而且似乎很少有关于这个问题的信息,所以我要回答自己的问题。

思路如下。使用 GRequests 的每个请求对象在创建时可以将会话对象作为参数传入。另一方面,会话对象可以挂载 HTTP 适配器,在进行请求时使用。通过创建自己的适配器,我们可以拦截请求并以最适合我们的应用程序的方式对它们进行速率限制。在我的情况下,我最终得到了以下代码。

用于限流的对象:

DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)


class BurstThrottle(object):
    max_hits = None
    hits = None
    burst_window = None
    total_window = None
    timestamp = None

    def __init__(self, max_hits, burst_window, wait_window):
        self.max_hits = max_hits
        self.hits = 0
        self.burst_window = burst_window
        self.total_window = burst_window + wait_window
        self.timestamp = datetime.datetime.min

    def throttle(self):
        now = datetime.datetime.utcnow()
        if now < self.timestamp + self.total_window:
            if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
                self.hits += 1
                return datetime.timedelta(0)
            else:
                return self.timestamp + self.total_window - now
        else:
            self.timestamp = now
            self.hits = 1
            return datetime.timedelta(0)

HTTP适配器:

class MyHttpAdapter(requests.adapters.HTTPAdapter):
    throttle = None

    def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
                 pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
                 pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
                 wait_window=DEFAULT_WAIT_WINDOW):
        self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
        super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
                                            max_retries=max_retries, pool_block=pool_block)

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        request_successful = False
        response = None
        while not request_successful:
            wait_time = self.throttle.throttle()
            while wait_time > datetime.timedelta(0):
                gevent.sleep(wait_time.total_seconds(), ref=True)
                wait_time = self.throttle.throttle()

            response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
                                                       verify=verify, cert=cert, proxies=proxies)

            if response.status_code != 429:
                request_successful = True

        return response

设置:

requests_adapter = adapter.MyHttpAdapter(
    pool_connections=__CONCURRENT_LIMIT__,
    pool_maxsize=__CONCURRENT_LIMIT__,
    max_retries=0,
    pool_block=False,
    burst_window=datetime.timedelta(seconds=5),
    wait_window=datetime.timedelta(seconds=20))

requests_session = requests.session()
requests_session.mount('http://', requests_adapter)
requests_session.mount('https://', requests_adapter)

unsent_requests = (grequests.get(url,
                                 hooks={'response': handle_response},
                                 session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)

10

查看这个自动请求限流的工具:https://pypi.python.org/pypi/RequestsThrottler/0.2.2

你可以设置固定延迟时间或者在固定时间内发送一定数量的请求来限制请求速率(本质上是相同的):

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', delay=1.5) as bt:
    throttled_requests = bt.multi_submit(reqs)

multi_submit函数返回ThrottledRequest的列表(请查看文档结尾处的链接)。

然后您可以访问响应:

for tr in throttled_requests:
    print tr.response

或者您可以通过指定在固定时间内发送的请求数量(例如每60秒15个请求)来实现相同的效果:

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', reqs_over_time=(15, 60)) as bt:
    throttled_requests = bt.multi_submit(reqs)

这两种解决方案都可以在不使用 with 语句的情况下实现:

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
bt = BaseThrottler(name='base-throttler', delay=1.5)
bt.start()
throttled_requests = bt.multi_submit(reqs)
bt.shutdown()

了解更多详情请访问:http://pythonhosted.org/RequestsThrottler/index.html


2
想要更好地控制节流/速率限制的人可以使用以下Python库: ratelimit 2.2.1: https://pypi.org/project/ratelimit/ ratelimit 2.2.1:该软件包引入了一个函数装饰器,防止函数被调用的频率超过API提供者允许的次数。这应该通过遵守速率限制来防止API提供者禁止您的应用程序。
from ratelimit import limits

import requests

FIFTEEN_MINUTES = 900

@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

1
我遇到过类似的问题。这是我的解决方案。对于你的情况,我会这样做:
def worker():
    with rate_limit('slow.domain.com', 2):
        response = requests.get('https://slow.domain.com/path')
        text = response.text
    # Use `text`

假设您有多个要整理的域名,我会建立一个字典映射(domain, delay),以便您不会触发速率限制。
此代码假设您将使用gevent和monkey patch。
from contextlib import contextmanager
from gevent.event import Event
from gevent.queue import Queue
from time import time


def rate_limit(resource, delay, _queues={}):
    """Delay use of `resource` until after `delay` seconds have passed.

    Example usage:

    def worker():
        with rate_limit('foo.bar.com', 1):
            response = requests.get('https://foo.bar.com/path')
            text = response.text
        # use `text`

    This will serialize and delay requests from multiple workers for resource
    'foo.bar.com' by 1 second.

    """

    if resource not in _queues:
        queue = Queue()
        gevent.spawn(_watch, queue)
        _queues[resource] = queue

    return _resource_manager(_queues[resource], delay)


def _watch(queue):
    "Watch `queue` and wake event listeners after delay."

    last = 0

    while True:
        event, delay = queue.get()

        now = time()

        if (now - last) < delay:
            gevent.sleep(delay - (now - last))

        event.set()   # Wake worker but keep control.
        event.clear()
        event.wait()  # Yield control until woken.

        last = time()


@contextmanager
def _resource_manager(queue, delay):
    "`with` statement support for `rate_limit`."

    event = Event()
    queue.put((event, delay))

    event.wait() # Wait for queue watcher to wake us.

    yield

    event.set()  # Wake queue watcher.

1

看起来requests或grequests代码中没有任何处理这个问题的简单机制。唯一似乎可用的挂钩是用于响应。

这里有一个超级hacky的解决方法,至少证明它是可能的 - 我修改了grequests以保持请求发出的时间列表,并睡眠创建AsyncRequest,直到每秒请求数低于最大值。

class AsyncRequest(object):
    def __init__(self, method, url, **kwargs):
        print self,'init'
        waiting=True
        while waiting:
            if len([x for x in q if x > time.time()-15]) < 8:
                q.append(time.time())
                waiting=False
            else:
                print self,'snoozing'
                gevent.sleep(1)

你可以使用 grequests.imap() 来进行交互式观察。
import time
import rg

urls = [
        'http://www.heroku.com',
        'http://python-tablib.org',
        'http://httpbin.org',
        'http://python-requests.org',
        'http://kennethreitz.com',
        'http://www.cnn.com',
]

def print_url(r, *args, **kwargs):
        print(r.url),time.time()

hook_dict=dict(response=print_url)
rs = (rg.get(u, hooks=hook_dict) for u in urls)
for r in rg.imap(rs):
        print r

我希望有一个更优雅的解决方案,但目前我找不到。在会话和适配器中查找过。也许可以增强池管理器?
另外,我不会将这段代码投入生产 - “q”列表从未被修剪且最终会变得非常大。此外,我不知道它是否按照广告运行。当我查看控制台输出时,它看起来像是这样。
唉,只要看看这段代码就知道现在是凌晨3点了。是时候去睡觉了。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接