什么是一种好的速率限制算法?

178

我需要一些伪代码,或者更好的是Python代码。 我正在尝试为Python IRC机器人实现一个速率限制队列,它部分工作正常,但如果有人触发的消息少于限制(例如,速率限制为每8秒5条消息,而该人只触发了4条),并且下一次触发在8秒之后(例如,16秒后),机器人会发送消息,但队列变满了,机器人将等待8秒,即使不需要等待,因为8秒时间已过。

12个回答

267

如果你只想在消息到达过快时删除它们(而不是将它们排队,这是有意义的,因为队列可能变得任意大),那么这里是最简单的算法

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

这个解决方案中没有数据结构、定时器等,而且它工作得很干净 :) 要看到这一点,“津贴”最多以5/8单位每秒的速度增长,即每8秒钟最多增加5个单位。每转发一条消息就会扣除一个单位,因此每8秒钟内不能发送超过5条消息。

请注意,rate 应该是一个整数,即没有非零小数部分,否则算法将无法正确工作(实际速率将不是 rate/per)。例如,rate=0.5; per=1.0; 不起作用,因为 allowance 永远不会增长到 1.0。但是,rate=1.0; per=2.0; 可以正常工作。


5
值得一提的是,“time_passed”所代表的时间维度和规模必须与“per”相同,例如单位为秒。 - skaffman
2
嗨skaffman,谢谢你的赞美---我随口就说出来了,但有99.9%的可能性是之前有人想出了类似的解决方案 :) - Antti Huima
57
这是一个标准算法——令牌桶,没有队列。桶的内容是“允许的令牌数”,桶的大小是“速率”。代码中的“allowance +=…”这一行是优化方法,每过per秒就会添加一个令牌,相当于每秒添加rate÷per个令牌。 - derobert
5
@zwirbeltier,您上面所写的并不正确。'Allowance'总是由'rate'限制(请看"// throttle"行),因此它只会在任何特定时间内允许恰好'rate'条消息的突发,即5。 - Antti Huima
8
这很好,但速率可以超过限制。例如,在时间0你发送了5条消息,然后在时间N*(8/5)(其中N=1, 2……)时,你可以再发送一条消息,这将导致在8秒的时间内发送超过5条消息。 - mindvirus
显示剩余11条评论

52

在你的函数之前使用@RateLimited(ratepersec)装饰器来限制速率。

基本上,它会检查自上次检查以来是否已经过去了1/rate秒,如果没有,则等待剩余的时间,否则不等待。这有效地将速率限制为rate/sec。可以将该装饰器应用于希望受到速率限制的任何函数。

对于您的情况,如果您想每8秒最多发送5条消息,请在您的sendToQueue函数之前使用@RateLimited(0.625)。

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

9
这是一个列表,因为像 float 这样的简单类型被闭包捕获时是常量。通过将其制作成列表,列表是常量,但其内容不是。是的,它不是线程安全的,但可以通过锁定轻松解决。 - Carlos A. Ibarra
1
time.clock() doesn't have enough resolution on my system, so I adapted the code and changed to use time.time() - mtrbean
4
对于速率限制,绝对不要使用 time.clock(),因为它测量的是经过的CPU时间。 CPU时间可以比“实际”时间快得多或慢得多。 相反,您应该使用 time.time(),它测量墙上的时间(“实际”时间)。 - John Wiseman
3
顺便提一句,对于真正的生产系统来说,在实现速率限制时使用sleep()调用可能不是一个好主意,因为它将阻塞线程,从而防止其他客户端使用它。 - Maresh
更新 lastTimeCalled 时,应该使用锁进行保护,对吗? - yueq
显示剩余5条评论

30
一个令牌桶(Token Bucket)的实现比较简单。
开始时有一个带有5个令牌的桶。
每五分之八秒:如果桶中令牌少于5个,则添加一个令牌。
每当你想要发送消息时:如果桶里有≥1个令牌,则取出一个令牌并发送消息。否则,等待/丢弃消息/执行其他操作。
(显然,在实际代码中,您将使用整数计数器代替真正的令牌,并且可以通过存储时间戳来优化每5/8秒的步骤)
阅读问题后,如果速率限制每8秒完全重置,则进行修改:
在很久以前的某个时间(例如在时代之初)开始一个名为last_send的时间戳,并使用相同的5个令牌桶。
去掉每五分之八秒的规则。
每次发送消息时:首先检查last_send是否≥ 8秒前。如果是,则填充桶(将其设置为5个令牌)。第二,如果桶中有令牌,则发送消息(否则,丢弃/等待等)。第三,将last_send设置为当前时间。
这应该适用于该场景。
我实际上已经使用此类策略编写了一个IRC机器人(使用第一种方法实现)。它是用Perl而不是Python编写的,但这里是一些代码以说明:此处的第一部分处理向桶中添加令牌。您可以看到基于时间添加令牌的优化(倒数第二行),然后最后一行将桶内容夹紧到最大值(MESSAGE_BURST)。
    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn是一个传递的数据结构。这是在定期运行的方法内部(它计算下一次需要做些什么,然后等待相应的时间或直到收到网络流量)。该方法的下一部分处理发送。由于消息具有与之关联的优先级,因此它相当复杂。

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,无论如何都会运行。即使它因洪水而杀死了我们的连接。用于极其重要的事情,例如响应服务器的PING。接下来,是其余的队列:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,存储桶状态被保存回$conn数据结构(实际上在方法的稍后部分;它首先计算它将有更多工作的时间有多长)。

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

正如你所看到的,实际的桶处理代码非常少,大约只有四行。其余的代码是优先级队列处理。这个机器人具有优先级队列,以便例如与它聊天的人无法阻止它执行重要的踢出/封禁职责。


我有什么遗漏吗...看起来这会限制你在通过前5个之后每8秒只能发送1条消息 - chills42
@chills42:是的,我看错了问题……请看答案的后半部分。 - derobert
@chills:如果上次发送的时间小于8秒,则不向桶中添加令牌。如果您的桶中有令牌,则可以发送消息;否则,您无法发送消息(因为在过去的8秒钟内已经发送了5条消息)。 - derobert
3
如果给这篇文章点踩的人能解释一下原因,我会非常感激。我想修复你们看到的任何问题,但是没有反馈很难做到! - derobert
@derobert,您能否在https://stackoverflow.com/questions/72699358/how-to-rate-limit-existing-xml-based-api-without-much-code-change-at-client-side上给予指导? - SVK

12

为了阻塞处理,直到消息可以被发送,从而排队等待进一步的消息,Antti的完美解决方案也可以进行如下修改:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等待足够的余量来发送消息。为了避免两倍速率的启动,余量也可以初始化为0。


5
当你睡觉时,应该将 (1-津贴比例) * (每小时费率) 的金额加到 上次结算 中。 - Alp

3

一种解决方案是给每个队列项附加一个时间戳,并在8秒后丢弃该项。每次添加队列时都可以执行此检查。

只有在队列大小限制为5并且队列已满时才会丢弃任何添加项,这种方法才有效。


2
保留最后五行消息发送的时间。将排队的消息保留,直到第五个最近的消息(如果存在)至少过去了8秒钟(使用last_five作为时间数组)。
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

自从你修改过它之后,我就不是了。 - Pesto
你正在存储五个时间戳,并通过内存重复移位(或执行链表操作)。我只存储一个整数计数器和一个时间戳。并且只进行算术和赋值操作。 - derobert
2
除了我的程序在尝试发送5行但时间段内只允许3行时会更好地运行。你的程序将允许发送前三行,并在发送第4和第5行之前强制等待8秒钟。我的程序将允许在第四和第五最近的行之后8秒钟发送第4和第5行。 - Pesto
1
但是就性能而言,可以通过使用长度为5的循环链表来改进,指向第五个最近的发送,覆盖它并在新的发送时将指针向前移动一个。 - Pesto
@Pesto:没错,爆发性确实不同。从任何一种方法中都可以轻松获得所需的行为。想要哪种行为取决于服务器如何实现其洪水限制。 - derobert
显示剩余2条评论

2

如果还有人感兴趣的话,我会使用这个简单的可调用类与定时LRU键值存储结合使用,以限制每个IP的请求速率。使用一个双端队列,但可以重新编写为使用列表。

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

1

这只是一个Python实现的接受答案中的代码。

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

有人向我建议,建议您添加代码的使用示例。 - Luc

0
我需要在Scala中进行变化。这是它:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: AB) extends (AB) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

以下是如何使用它的方法:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

0

另一种解决方案

from collections import deque
from datetime import timedelta
from time import sleep

class RateLimiter:
    def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
        self.items = items
        self.per = per
        self.deque = deque(maxlen=items)

    def count(self):
        now = datetime.now()
        self.deque.append(now)

    def time_to_wait(self) -> timedelta:
        if len(self.deque) < self.deque.maxlen:
            return timedelta(0)
        now = datetime.now()
        per = now - self.deque[0]
        return max(timedelta(0), self.per - per)

    def throttle(self):
        sleep(self.time_to_wait().total_seconds())
        self.count()

if __name__ == '__main__':
    rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))

    for i in range(10):
        rate_limiter.throttle()
        print(f'{i}')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接