Django REST框架:按用户限制请求频率

11

我有一些用户需要非常高的节流,以便他们可以经常使用系统。有没有简单的方法可以为他们提供比其他用户更高的节流?

我已经找过了,但没有找到任何东西。

3个回答

18

我通过扩展UserRateThrottle并在我的settings文件中添加特殊用户的方式找到了解决方法。

这个类只是覆盖了allow_request方法,并添加了一些特殊逻辑,以查看用户名是否在OVERRIDE_THROTTLE_RATES变量中列出:

class ExceptionalUserRateThrottle(UserRateThrottle):
    def allow_request(self, request, view):
        """
        Give special access to a few special accounts.

        Mirrors code in super class with minor tweaks.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        # Adjust if user has special privileges.
        override_rate = settings.REST_FRAMEWORK['OVERRIDE_THROTTLE_RATES'].get(
            request.user.username,
            None,
        )
        if override_rate is not None:
            self.num_requests, self.duration = self.parse_rate(override_rate)

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

要使用它,只需将DEFAULT_THROTTLE_CLASS设置为此类,然后像这样将一些特殊用户放入OVERRIDE_THROTTLE_RATES

'DEFAULT_THROTTLE_CLASSES': (
    'rest_framework.throttling.AnonRateThrottle',
    'cl.api.utils.ExceptionalUserRateThrottle',
),
'DEFAULT_THROTTLE_RATES': {
    'anon': '100/day',
    'user': '1000/hour',
},
'OVERRIDE_THROTTLE_RATES': {
    'scout': '10000/hour',
    'scout_test': '10000/hour',
},

谢谢你的回答!这个想法真的很棒,绝对值得包含在DRF中。 - blacatus
我还没有尝试将这个推送到上游,但如果有人试图将其纳入主代码库,我会感到高兴。 - mlissner
2
对我来说,理想的解决方案应该是在Django管理页面中按组对用户进行分组,并通过这些组进行限流。尝试在您的代码和这个想法之间进行混合。如果我想到了什么,会在这里发布。 - blacatus

2

在自定义Django REST限流后,我已经找到了解决方案,

它会在3次登录尝试后阻止特定用户(阻止应用程序中存在的user_id)。对于匿名用户,在6次登录尝试后阻止IP地址。

prevent.py :-

#!/usr/bin/python

from collections import Counter

from rest_framework.throttling import SimpleRateThrottle
from django.contrib.auth.models import User


class UserLoginRateThrottle(SimpleRateThrottle):
    scope = 'loginAttempts'

    def get_cache_key(self, request, view):
        user = User.objects.filter(username=request.data.get('username'))
        ident = user[0].pk if user else self.get_ident(request)

        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }

    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.
        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()

        if len(self.history) >= self.num_requests:
            return self.throttle_failure()

        if len(self.history) >= 3:
            data = Counter(self.history)
            for key, value in data.items():
                if value == 2:
                    return self.throttle_failure()
        return self.throttle_success(request)

    def throttle_success(self, request):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        user = User.objects.filter(username=request.data.get('username'))
        if user:
            self.history.insert(0, user[0].id)
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

Views.py文件:

from .prevent import UserLoginRateThrottle
   ....
   ....
   ....
   class ObtainAuthToken(auth_views.ObtainAuthToken):
       throttle_classes = (UserLoginRateThrottle,)/use this method here your login view

       def post(self, request, *args, **kwargs):
           ....
           ....

Settings.py :-

Django-rest-framework

REST_FRAMEWORK = {
    ...
    ...
    ...
    'DEFAULT_THROTTLE_CLASSES': (
        'rest_framework.throttling.UserRateThrottle',

    ),
    'DEFAULT_THROTTLE_RATES': {
        'loginAttempts': '6/hr',
        'user': '1000/min',
    }
}

你为什么要在 throttle_success 函数里这样做呢? 如果 user 存在: self.history.insert(0, user[0].id) user.id 和 time 是两个不同的属性,对吧? - Anmol Vijaywargiya
这怎么回答问题呢?用户问的是如何增加特定用户的访问权限,而不是如何阻止登录认证失败的用户。 - frederj

1

我知道这是一个相当老的帖子,而且被接受的答案对我也很有帮助。我想展示如何设置多个用户速率限制,在这种情况下,为root用户添加额外的限制。

from rest_framework.settings import api_settings
from django.core.exceptions import ImproperlyConfigured

class RootRateThrottle(UserRateThrottle):
    """
    Limits the rate of API calls that may be made by a given user.
    The user id will be used as a unique cache key if the user is
    authenticated.  For anonymous requests, the IP address of the request will
    be used.
    """

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            ident = request.user.pk
        else:
            ident = self.get_ident(request)

        self.rate = self.get_rate(request)
        logger.debug(
            "Throttling rate for %s: %s", request.user, self.rate
        )

        self.num_requests, self.duration = self.parse_rate(self.rate)
        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }

    def get_rate(self, request=None):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        if request and request.user.is_superuser:
            throttle_rates = settings.REST_FRAMEWORK["ROOT_THROTTLE_RATES"]
        else:
            throttle_rates = api_settings.DEFAULT_THROTTLE_RATES
        try:
            return throttle_rates[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

class ByMinuteRateThrottle(RootRateThrottle):
    scope = 'minute'


class ByHourRateThrottle(RootRateThrottle):
    scope = 'hour'


class ByDayRateThrottle(RootRateThrottle):
    scope = 'day'

设置部分的样子就像这样。
'DEFAULT_THROTTLE_CLASSES': [
    'threedi_api.throttling.ByMinuteRateThrottle',
    'threedi_api.throttling.ByHourRateThrottle',
    'threedi_api.throttling.ByDayRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
    'minute': '100/min',
    'hour': '1000/hour',
    'day': '5000/day',
},
'ROOT_THROTTLE_RATES': {
    'minute': '200/min',
    'hour': '2000/hour',
    'day': '10000/day',
},

这似乎很有用,但你把代码放在哪里? - SJ19

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接