如何使用Redis实现速率限制

25

我使用 INCREXPIRE 实现速率限制,例如每分钟 5 次请求:

if EXISTS counter
    count = INCR counter
else
    EXPIRE counter 60
    count = INCR counter

if count > 5
    print "Exceeded the limit"    

然而,在最后一秒钟可以发送5个请求,在第二分钟的第一秒钟又可以发送5个请求,即在两秒钟内发送10个请求。

如何避免这个问题?


更新:我想出了这个列表实现。这是一个好方法吗?

times = LLEN counter
if times < 5
    LPUSH counter now()
else
    time = LINDEX counter -1
    if now() - time < 60
        print "Exceeded the limit"
    else
        LPUSH counter now()
LTRIM counter 5

是的,那是一个有效而好的解决方案。甚至比使用集合更好 ;) - alto
1
你的解决方案中Redis LUA脚本不支持now()吗?那么你想将now()作为参数传递,这样不同的机器将具有不同的毫秒粒度,因此now() - time将不准确? - Kanagavelu Sugumar
对于第二个例子,我猜在大约120秒后过期counter是有意义的,特别是如果你有很多counter键。 - keune
2
前五个请求是突发的,它们之间没有(now() - time < 60)分钟的时间间隔... - Kanagavelu Sugumar
11个回答

15
您可以将“最近一分钟内5个请求”更改为“每分钟5个请求”。这样可以实现以下操作:
counter = current_time # for example 15:03
count = INCR counter
EXPIRE counter 60 # just to make sure redis doesn't store it forever

if count > 5
  print "Exceeded the limit"

如果您想继续使用“每分钟5个请求”的限制,则可以执行以下操作:
counter = Time.now.to_i # this is Ruby and it returns the number of milliseconds since 1/1/1970
key = "counter:" + counter
INCR key
EXPIRE key 60

number_of_requests = KEYS "counter"*"
if number_of_requests > 5
  print "Exceeded the limit"

如果您有生产约束(尤其是性能方面),不建议使用KEYS关键字。我们可以使用sets代替:(参考链接)
counter = Time.now.to_i # this is Ruby and it returns the number of milliseconds since 1/1/1970
set = "my_set"
SADD set counter 1

members = SMEMBERS set

# remove all set members which are older than 1 minute
members {|member| SREM member if member[key] < (Time.now.to_i - 60000) }

if (SMEMBERS set).size > 5
  print "Exceeded the limit"

这是伪代码,但可以让您了解基本思路。

你说得没错。但是我们还不知道任何与生产相关的限制。尽管如此,我已经编辑了我的答案,使用Redis集合代替。 - alto
第三个想法太棒了!它对我帮助很大。非常感谢你的绝妙想法。只有一个评论:如果没有添加新元素,集合应在1分钟后过期。这将有助于节省存储空间。 类似于:在“SADD set counter 1”之后立即使用“EXPIRE set 60”。 - Phong Bui
那甚至更好。 - alto

8
使用Leaky bucket算法是限流的规范方式。使用计数器的缺点是,用户可以在计数器重置后立即执行一堆请求,例如在下一分钟的第一秒执行5个操作。Leaky bucket算法解决了这个问题。简单来说,您可以使用有序集合存储您的“漏桶”,使用动作时间戳作为键来填充它。
请查看这篇文章以获取精确的实现方法:使用Redis Sorted Sets进行更好的速率限制 更新: 还有另一种算法,与leaky bucket相比具有一些优势。它被称为通用小区速率算法。这是它在高层次上的工作原理,如速率限制、细胞和GCRA所述:
GCRA通过跟踪“理论到达时间”(TAT)来追踪剩余限制,该时间是在第一个请求中添加代表其成本的持续时间到当前时间。成本是我们“发射间隔”(T)的倍数计算的,该间隔是从我们想要桶重新填充的速率派生的。当任何后续请求到来时,我们取现有的TAT,从中减去表示极限总爆发容量的固定缓冲区(τ+T),并将结果与当前时间进行比较。这个结果表示下一次允许请求的时间。如果它在过去,我们允许传入请求,如果它在未来,我们就不允许。成功请求后,通过添加T来计算新的TAT。

有一个实现此算法的Redis模块可在GitHub上找到: https://github.com/brandur/redis-cell


4
这是一个旧问题,已经有答案了,但是我从这里得到了一些灵感并进行了实现。我正在使用Node.js的ioredis
以下是异步且无竞争条件(我希望如此)的滚动时间窗口限制器:
var Ioredis = require('ioredis');
var redis = new Ioredis();

// Rolling window rate limiter
//
// key is a unique identifier for the process or function call being limited
// exp is the expiry in milliseconds
// maxnum is the number of function calls allowed before expiry
var redis_limiter_rolling = function(key, maxnum, exp, next) {
  redis.multi([
    ['incr', 'limiter:num:' + key],
    ['time']
  ]).exec(function(err, results) {
    if (err) {
      next(err);
    } else {
      // unique incremented list number for this key
      var listnum = results[0][1];
      // current time
      var tcur = (parseInt(results[1][1][0], 10) * 1000) + Math.floor(parseInt(results[1][1][1], 10) / 1000);
      // absolute time of expiry
      var texpiry = tcur - exp;
      // get number of transacation in the last expiry time
      var listkey = 'limiter:list:' + key;
      redis.multi([
        ['zadd', listkey, tcur.toString(), listnum],
        ['zremrangebyscore', listkey, '-inf', texpiry.toString()],
        ['zcard', listkey]
      ]).exec(function(err, results) {
        if (err) {
          next(err);
        } else {
          // num is the number of calls in the last expiry time window
          var num = parseInt(results[2][1], 10);
          if (num <= maxnum) {
            // does not reach limit
            next(null, false, num, exp);
          } else {
            // limit surpassed
            next(null, true, num, exp);
          }
        }
      });
    }
  });
};

这里是一种锁定式的速率限制器:

// Lockout window rate limiter
//
// key is a unique identifier for the process or function call being limited
// exp is the expiry in milliseconds
// maxnum is the number of function calls allowed within expiry time
var util_limiter_lockout = function(key, maxnum, exp, next) {
  // lockout rate limiter
  var idkey = 'limiter:lock:' + key;
  redis.incr(idkey, function(err, result) {
    if (err) {
      next(err);
    } else {
      if (result <= maxnum) {
        // still within number of allowable calls
        // - reset expiry and allow next function call
        redis.expire(idkey, exp, function(err) {
          if (err) {
            next(err);
          } else {
            next(null, false, result);
          }
        });
      } else {
        // too many calls, user must wait for expiry of idkey
        next(null, true, result);
      }
    }
  });
};

这里是函数的要点。如果您发现任何问题,请告诉我。


2
注意:以下代码是Java的示例实现。
private final String COUNT = "count";

@Autowired
private StringRedisTemplate stringRedisTemplate;
private HashOperations hashOperations;

@PostConstruct
private void init() {
    hashOperations = stringRedisTemplate.opsForHash();
}

@Override
public boolean isRequestAllowed(String key, long limit, long timeout, TimeUnit timeUnit) {
    Boolean hasKey = stringRedisTemplate.hasKey(key);
    if (hasKey) {
        Long value = hashOperations.increment(key, COUNT, -1l);
        return value > 0;
    } else {
        hashOperations.put(key, COUNT, String.valueOf(limit));
        stringRedisTemplate.expire(key, timeout, timeUnit);
    }
    return true;
}

2

这是我使用 Redis Lists 实现的漏桶算法速率限制代码。

注意:以下代码是php的示例实现,您可以用自己熟悉的编程语言实现它。

$list = $redis->lRange($key, 0, -1); // get whole list
$noOfRequests = count($list);
if ($noOfRequests > 5) {
    $expired = 0;
    foreach ($list as $timestamp) {
        if ((time() - $timestamp) > 60) { // Time difference more than 1 min == expired
            $expired++;
        }
    }
    if ($expired > 0) {
        $redis->lTrim($key, $expired, -1); // Remove expired requests
        if (($noOfRequests - $expired) > 5) { // If still no of requests greater than 5, means fresh limit exceeded.
            die("Request limit exceeded");
        }
    } else { // No expired == all fresh.
        die("Request limit exceeded");
    }
}
$redis->rPush($key, time()); // Add this request as a genuine one to the list, and proceed.

1
与其他Java解答类似,但减少了与Redis的往返次数:
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private HashOperations hashOperations;

    @PostConstruct
    private void init() {
        hashOperations = stringRedisTemplate.opsForHash();
    }

    @Override
    public boolean isRequestAllowed(String key, long limit, long timeout, TimeUnit timeUnit) {
        Long value = hashOperations.increment(key, COUNT, 1l);
        if (value == 1) {
            stringRedisTemplate.expire(key, timeout, timeUnit);
        }
        return value > limit;
    }

1

你的更新是非常好的算法,尽管我做了一些修改:

times = LLEN counter
if times < 5
    LPUSH counter now()
else
    time = LINDEX counter -1
    if now() - time <= 60
        print "Exceeded the limit"
    else
        LPUSH counter now()
        RPOP counter

7
你为什么做出那个改变?你背后的推理是什么? - Matrix

0
上一个时间段内的请求数 / 滑动窗口
间隔 == 接受请求(吞吐量)的时间量
吞吐量 == 每个间隔内的请求数
RequestTimeList == 将每个请求时间添加到此列表中
// Remove older request entries
while (!RequestTimeList.isEmpty() && (now() - RequestTimeList.get(0)) > interval) {

    RequestTimeList.remove(0)
}

if (RequestTimeList.length < throughput) {

    RequestTimeList.add(now())

} else {

    throw err;
}

时间间隔/固定窗口中的请求

我尝试了LIST、EXPIRE和PTTL。

如果每秒钟tps为5,则
吞吐量= 5
rampup = 1000(1000毫秒= 1秒)
间隔= 200毫秒

local tpsKey = KEYS[1]
local throughput = tonumber(ARGV[1])
local rampUp = tonumber(ARGV[2])
-- Minimum interval to accept the next request.
local interval = rampUp / throughput
local currentTime = redis.call('PTTL', tpsKey)

--  -2 if the key does not exist, so set an year expiry
if currentTime == -2 then
    currentTime = 31536000000 - interval
    redis.call('SET', tpsKey, 31536000000, "PX", currentTime)
end

local previousTime = redis.call('GET', tpsKey)

if (previousTime - currentTime) >=  interval then
       redis.call('SET', tpsKey, currentTime, "PX", currentTime)
       return true
else
       redis.call('ECHO',"0. ERR - MAX PERMIT REACHED IN THIS INTERVAL")
       return false
end

使用List的另一种方式

local counter = KEYS[1]
local throughput = tonumber(ARGV[1]) 
local rampUp = tonumber(ARGV[2])
local interval = rampUp / throughput
local times = redis.call('LLEN', counter)

if times == 0 then
    redis.call('LPUSH', counter, rampUp)
    redis.call('PEXPIRE', counter, rampUp)
    return true
elseif times < throughput then
    local lastElemTTL = tonumber(redis.call('LINDEX', counter, 0))
    local currentTTL = redis.call('PTTL', counter)
    if  (lastElemTTL-currentTTL) < interval then
        return false
    else
        redis.call('LPUSH', counter, currentTTL)
        return true
     end
else
    return false
end

0

这里有一种替代方法。如果目标是限制每个用户在Y秒内向X请求的数量,并且计时器从第一个请求接收时开始,则可以为要跟踪的每个用户创建两个键:一个用于记录第一个请求接收的时间,另一个用于记录请求次数。

key = "123"
key_count = "ct:#{key}"
key_timestamp = "ts:#{key}"

if (not redis[key_timestamp].nil?) && (not redis[key_count].nil?) && (redis[key_count].to_i > 3)
    puts "limit reached"
else
    if redis[key_timestamp].nil?
        redis.multi do
            redis.set(key_count, 1)
            redis.set(key_timestamp, 1)
            redis.expire(key_timestamp,30)
        end
    else
        redis.incr(key_count)
    end
    puts redis[key_count].to_s + " : " + redis[key_timestamp].to_s + " : " + redis.ttl(key_timestamp).to_s
end

这个算法中键的数量会减少吗?似乎不会。 - abdelrahman-sinno

0

这个内容很小,你可能不需要对它进行哈希处理。

local f,k,a,b f=redis.call k=KEYS[1] a=f('incrby',k,ARGV[1]) b=f('pttl',k) if b<0 then f('pexpire',k,ARGV[2]) end return a

参数如下:

KEYS[1] = 键名,例如可以是限制速率的操作
ARGV[1] = 增加的数量,通常为1,但您可以在客户端每10或100毫秒间隔批量处理
ARGV[2] = 以毫秒为单位的时间窗口,用于限制速率
返回值:新增的值,然后可以将其与代码中的值进行比较,以查看是否超过了速率限制。

使用此方法不会将ttl设置回基础值,它将继续滑动,直到键过期,然后在下一次调用时使用ARGV[2] ttl重新开始。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接