我需要一个组件/类,将某个方法的执行速率限制为每N秒(或毫秒或纳秒)最多M次。
换句话说,我需要确保在一个滑动窗口内,该方法的执行不超过M次。
如果您不知道现有的类,请随意发布您实现这一要求的解决方案/想法。
我需要一个组件/类,将某个方法的执行速率限制为每N秒(或毫秒或纳秒)最多M次。
换句话说,我需要确保在一个滑动窗口内,该方法的执行不超过M次。
如果您不知道现有的类,请随意发布您实现这一要求的解决方案/想法。
我会使用一个大小为M的时间戳环形缓冲区。每次方法调用时,检查最老的条目,如果它在过去的N秒内,则执行并添加另一个条目,否则睡眠到时间差。
对于我来说直接使用的是Google Guava RateLimiter。
// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);
private void someMethod() {
throttle.acquire();
// Do something
}
tryAquire()
。 - slfRatelimiter.create(1.0/60)
和 acquire()
可以实现每分钟1个调用。 - bizentassDelayQueue
来实现此操作。使用延迟为零的 M
个 Delayed
实例初始化队列。当方法收到请求时,使用 take
方法获取一个令牌,导致该方法阻塞直到满足节流要求。当获取到一个令牌时,使用 add
方法在队列中添加一个延迟为 N
的新令牌。阅读关于令牌桶(Token bucket)算法的相关内容。基本上,您有一个带有令牌的桶,每次执行方法时就会消耗一个令牌。如果没有令牌了,就会阻塞直到获取令牌。同时,在固定时间间隔内,有某个外部参与者会不断补充令牌。
我不知道是否有库可以实现这样的功能(或类似的功能)。您可以将此逻辑编写到您的代码中,或使用AspectJ来添加此行为。
import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;
RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);
boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");
public class LeakyBucket {
protected float maxRate;
protected long minTime;
//holds time of last action (past or future!)
protected long lastSchedAction = System.currentTimeMillis();
public LeakyBucket(float maxRate) throws Exception {
if(maxRate <= 0.0f) {
throw new Exception("Invalid rate");
}
this.maxRate = maxRate;
this.minTime = (long)(1000.0f / maxRate);
}
public void consume() throws InterruptedException {
long curTime = System.currentTimeMillis();
long timeLeft;
//calculate when can we do the action
synchronized(this) {
timeLeft = lastSchedAction + minTime - curTime;
if(timeLeft > 0) {
lastSchedAction += minTime;
}
else {
lastSchedAction = curTime;
}
}
//If needed, wait for our time
if(timeLeft <= 0) {
return;
}
else {
Thread.sleep(timeLeft);
}
}
}
minTime
是什么意思?它是用来做什么的?你能解释一下吗? - flashminTime
是在消耗一个令牌后需要等待的最短时间,才能够消耗下一个令牌。 - Duarte Meneses我下面的实现可以处理任意请求时间精度,每个请求的时间复杂度为O(1),不需要任何额外的缓冲区,例如O(1) 的空间复杂度,而且它不需要后台线程来释放令牌,相反,令牌是根据自上次请求以来经过的时间来释放的。
class RateLimiter {
int limit;
double available;
long interval;
long lastTimeStamp;
RateLimiter(int limit, long interval) {
this.limit = limit;
this.interval = interval;
available = 0;
lastTimeStamp = System.currentTimeMillis();
}
synchronized boolean canAdd() {
long now = System.currentTimeMillis();
// more token are released since last request
available += (now-lastTimeStamp)*1.0/interval*limit;
if (available>limit)
available = limit;
lastTimeStamp = now;
if (available<1)
return false;
else {
available--;
return true;
}
}
}
lastTimeStamp
,因此在有完整令牌之前,你调用得越频繁 - 你就会更快地累积令牌分数,而不管实际经过了多少时间。例如:我使用(2,1000)
进行初始化,然后等待100ms并快速调用5次 - 即使只经过了略微超过100ms,第5次调用也将被允许。 - Guss这是对上面的LeakyBucket代码的更新。 可以处理每秒超过1000个请求。
import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;
class LeakyBucket {
private long minTimeNano; // sec / billion
private long sched = System.nanoTime();
/**
* Create a rate limiter using the leakybucket alg.
* @param perSec the number of requests per second
*/
public LeakyBucket(double perSec) {
if (perSec <= 0.0) {
throw new RuntimeException("Invalid rate " + perSec);
}
this.minTimeNano = (long) (1_000_000_000.0 / perSec);
}
@SneakyThrows public void consume() {
long curr = System.nanoTime();
long timeLeft;
synchronized (this) {
timeLeft = sched - curr + minTimeNano;
sched += minTimeNano;
}
if (timeLeft <= minTimeNano) {
return;
}
TimeUnit.NANOSECONDS.sleep(timeLeft);
}
}
并且上述的单元测试:
import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class LeakyBucketTest {
@Test @Ignore public void t() {
double numberPerSec = 10000;
LeakyBucket b = new LeakyBucket(numberPerSec);
Stopwatch w = Stopwatch.createStarted();
IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
x -> b.consume());
System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
}
}
minTimeNano
是什么意思?你能解释一下吗? - flash