我需要一些伪代码,或者更好的是Python代码。 我正在尝试为Python IRC机器人实现一个速率限制队列,它部分工作正常,但如果有人触发的消息少于限制(例如,速率限制为每8秒5条消息,而该人只触发了4条),并且下一次触发在8秒之后(例如,16秒后),机器人会发送消息,但队列变满了,机器人将等待8秒,即使不需要等待,因为8秒时间已过。
我需要一些伪代码,或者更好的是Python代码。 我正在尝试为Python IRC机器人实现一个速率限制队列,它部分工作正常,但如果有人触发的消息少于限制(例如,速率限制为每8秒5条消息,而该人只触发了4条),并且下一次触发在8秒之后(例如,16秒后),机器人会发送消息,但队列变满了,机器人将等待8秒,即使不需要等待,因为8秒时间已过。
如果你只想在消息到达过快时删除它们(而不是将它们排队,这是有意义的,因为队列可能变得任意大),那么这里是最简单的算法:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
这个解决方案中没有数据结构、定时器等,而且它工作得很干净 :) 要看到这一点,“津贴”最多以5/8单位每秒的速度增长,即每8秒钟最多增加5个单位。每转发一条消息就会扣除一个单位,因此每8秒钟内不能发送超过5条消息。
请注意,rate
应该是一个整数,即没有非零小数部分,否则算法将无法正确工作(实际速率将不是 rate/per
)。例如,rate=0.5; per=1.0;
不起作用,因为 allowance
永远不会增长到 1.0。但是,rate=1.0; per=2.0;
可以正常工作。
在你的函数之前使用@RateLimited(ratepersec)装饰器来限制速率。
基本上,它会检查自上次检查以来是否已经过去了1/rate秒,如果没有,则等待剩余的时间,否则不等待。这有效地将速率限制为rate/sec。可以将该装饰器应用于希望受到速率限制的任何函数。
对于您的情况,如果您想每8秒最多发送5条消息,请在您的sendToQueue函数之前使用@RateLimited(0.625)。
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
time.clock()
doesn't have enough resolution on my system, so I adapted the code and changed to use time.time()
- mtrbeantime.clock()
,因为它测量的是经过的CPU时间。 CPU时间可以比“实际”时间快得多或慢得多。 相反,您应该使用 time.time()
,它测量墙上的时间(“实际”时间)。 - John Wiseman my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn是一个传递的数据结构。这是在定期运行的方法内部(它计算下一次需要做些什么,然后等待相应的时间或直到收到网络流量)。该方法的下一部分处理发送。由于消息具有与之关联的优先级,因此它相当复杂。
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
这是第一个队列,无论如何都会运行。即使它因洪水而杀死了我们的连接。用于极其重要的事情,例如响应服务器的PING。接下来,是其余的队列:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
最后,存储桶状态被保存回$conn数据结构(实际上在方法的稍后部分;它首先计算它将有更多工作的时间有多长)。
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
正如你所看到的,实际的桶处理代码非常少,大约只有四行。其余的代码是优先级队列处理。这个机器人具有优先级队列,以便例如与它聊天的人无法阻止它执行重要的踢出/封禁职责。
为了阻塞处理,直到消息可以被发送,从而排队等待进一步的消息,Antti的完美解决方案也可以进行如下修改:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
它只是等待足够的余量来发送消息。为了避免两倍速率的启动,余量也可以初始化为0。
(1-津贴比例) * (每小时费率)
的金额加到 上次结算
中。 - Alp一种解决方案是给每个队列项附加一个时间戳,并在8秒后丢弃该项。每次添加队列时都可以执行此检查。
只有在队列大小限制为5并且队列已满时才会丢弃任何添加项,这种方法才有效。
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
如果还有人感兴趣的话,我会使用这个简单的可调用类与定时LRU键值存储结合使用,以限制每个IP的请求速率。使用一个双端队列,但可以重新编写为使用列表。
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
这只是一个Python实现的接受答案中的代码。
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
以下是如何使用它的方法:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
另一种解决方案
from collections import deque
from datetime import timedelta
from time import sleep
class RateLimiter:
def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
self.items = items
self.per = per
self.deque = deque(maxlen=items)
def count(self):
now = datetime.now()
self.deque.append(now)
def time_to_wait(self) -> timedelta:
if len(self.deque) < self.deque.maxlen:
return timedelta(0)
now = datetime.now()
per = now - self.deque[0]
return max(timedelta(0), self.per - per)
def throttle(self):
sleep(self.time_to_wait().total_seconds())
self.count()
if __name__ == '__main__':
rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))
for i in range(10):
rate_limiter.throttle()
print(f'{i}')