什么是在DRF中测试节流的正确方式?我在网络上找不到任何答案。我想为每个端点编写单独的测试,因为每个端点都有自定义的请求限制(ScopedRateThrottle)。
重要的是它不能影响其他测试 - 它们必须以某种方式在没有节流和限制的情况下运行。
什么是在DRF中测试节流的正确方式?我在网络上找不到任何答案。我想为每个端点编写单独的测试,因为每个端点都有自定义的请求限制(ScopedRateThrottle)。
重要的是它不能影响其他测试 - 它们必须以某种方式在没有节流和限制的情况下运行。
get_rate
方法进行修补。感谢tprestegard的评论!在我的情况下,我有一个自定义类:from rest_framework.throttling import UserRateThrottle
class AuthRateThrottle(UserRateThrottle):
scope = 'auth'
在你的测试中:
from unittest.mock import patch
from django.core.cache import cache
from rest_framework import status
class Tests(SimpleTestCase):
def setUp(self):
cache.clear()
@patch('path.to.AuthRateThrottle.get_rate')
def test_throttling(self, mock):
mock.return_value = '1/day'
response = self.client.post(self.url, {})
self.assertEqual(
response.status_code,
status.HTTP_400_BAD_REQUEST, # some fields are required
)
response = self.client.post(self.url, {})
self.assertEqual(
response.status_code,
status.HTTP_429_TOO_MANY_REQUESTS,
)
还可以通过对DRF包中的方法进行补丁(patch)来更改标准限流类的行为:@patch('rest_framework.throttling.SimpleRateThrottle.get_rate')
像其他人提到的那样,这并不完全属于单元测试范畴,但是,怎么样简单地做一些类似这样的事情:
from django.core.urlresolvers import reverse
from django.test import override_settings
from rest_framework.test import APITestCase, APIClient
class ThrottleApiTests(APITestCase):
# make sure to override your settings for testing
TESTING_THRESHOLD = '5/min'
# THROTTLE_THRESHOLD is the variable that you set for DRF DEFAULT_THROTTLE_RATES
@override_settings(THROTTLE_THRESHOLD=TESTING_THRESHOLD)
def test_check_health(self):
client = APIClient()
# some end point you want to test (in this case it's a public enpoint that doesn't require authentication
_url = reverse('check-health')
# this is probably set in settings in you case
for i in range(0, self.TESTING_THRESHOLD):
client.get(_url)
# this call should err
response = client.get(_url)
# 429 - too many requests
self.assertEqual(response.status_code, 429)
setUp
或setUpTestData
中进行用户创建,测试就会被隔离(应该如此),因此不必担心“脏”数据或范围方面的问题。tearDown
中添加cache.clear()
或尝试清除为限制定义的特定键。我为基于用户和调用请求的参数进行节流实现了自己的缓存机制。您可以覆盖 SimpleRateThrottle.get_cache_key
方法来获得此行为。
以这个节流类为例:
class YourCustomThrottleClass(SimpleRateThrottle):
rate = "1/d"
scope = "your-custom-throttle"
def get_cache_key(self, request: Request, view: viewsets.ModelViewSet):
# we want to throttle the based on the request user as well as the parameter
# `foo` (i.e. the user can make a request with a different `foo` as many times
# as they want in a day, but only once a day for a given `foo`).
foo_request_param = view.kwargs["foo"]
ident = f"{request.user.pk}_{foo_request_param}"
# below format is copied from `UserRateThrottle.get_cache_key`.
return self.cache_format % {"scope": self.scope, "ident": ident}
TestCase
中清除这个问题,我会根据需要在每个测试方法中调用以下方法:def _clear_throttle_cache(self, request_user, foo_param):
# we need to clear the cache of the throttle limits already stored there.
throttle = YourCustomThrottleClass()
# in the below two lines mock whatever attributes on the request and
# view instances are used to calculate the cache key in `.get_cache_key`
# which you overrode. Here we use `request.user` and `view.kwargs["foo"]`
# to calculate the throttle key, so we mock those.
pretend_view = MagicMock(kwargs={foo: foo_param})
pretend_request = MagicMock(user=request_user)
# this is the method you overrode in `YourCustomThrottleClass`.
throttle_key = throttle.get_cache_key(pretend_request, pretend_view)
throttle.cache.delete(user_key)