在N秒内限制方法调用为M个请求

167

我需要一个组件/类,将某个方法的执行速率限制为每N秒(或毫秒或纳秒)最多M次。

换句话说,我需要确保在一个滑动窗口内,该方法的执行不超过M次。

如果您不知道现有的类,请随意发布您实现这一要求的解决方案/想法。


4
这个问题在 https://dev59.com/D3RB5IYBdhLWcg3wSVm1 有一些很好的回答。 - skaffman
原问题听起来很像这篇博客中解决的问题:Java 多通道异步限流器。对于在 N 秒内进行 M 次调用的速率,此博客中讨论的限流器保证时间线上任何长度为 N 的时间区间都不会包含超过 M 次的调用。 - Hbf
https://dev59.com/RljUa4cB1Zd3GeqPNQIn - kervin
此外,同一人已经改进了他的实现,以保证调用顺序的保留:http://www.cordinc.com/blog/2010/06/ordered-java-multichannel-asyn.html - ikaerom
当在分布式系统中需要锁定时,您可以使用Redis。请参考https://redis.io/commands/incr中的第二个算法。 - Kanagavelu Sugumar
显示剩余3条评论
15个回答

97

我会使用一个大小为M的时间戳环形缓冲区。每次方法调用时,检查最老的条目,如果它在过去的N秒内,则执行并添加另一个条目,否则睡眠到时间差。


5
太好了,正是我需要的。快速尝试显示实现只需大约10行,并且内存占用很小。只需要考虑线程安全和排队处理传入的请求即可。 - vtrubnikov
7
这就是为什么使用 java.util.concurrent 中的 DelayQueue。它可以防止多个线程同时处理同一条目的问题。 - erickson
6
对于多线程的情况,我认为令牌桶算法可能是更好的选择。 - Michael Borgwardt
1
你知道这个算法叫什么名字吗?如果有的话。 - Vlado Pandžić
2
@MichaelBorgwardt “如果它是过去不到N秒钟”,你的意思是“超过N秒钟”吗?难道你不是想要只有在给定时间过去后才调用吗? - oligofren

96

对于我来说直接使用的是Google Guava RateLimiter

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}

24
我不建议使用这个解决方案,因为Guava RateLimiter会阻塞线程,从而容易耗尽线程池。 - kaviddiss
24
如果你不想阻塞线程,那么请使用 tryAquire() - slf
9
目前的RateLimiter实现存在问题(至少对我来说),它不允许大于1秒的时间段,因此无法实现例如每分钟1次的速率。 - John B
5
据我所知,您可以通过使用RateLimiter.create(60.0)+rateLimiter.acquire(60)实现每分钟1个请求。 - divideByZero
3
@radiantRazor 的代码 Ratelimiter.create(1.0/60)acquire() 可以实现每分钟1个调用。 - bizentass
显示剩余6条评论

30
具体来说,您应该能够使用 DelayQueue 来实现此操作。使用延迟为零的 MDelayed 实例初始化队列。当方法收到请求时,使用 take 方法获取一个令牌,导致该方法阻塞直到满足节流要求。当获取到一个令牌时,使用 add 方法在队列中添加一个延迟为 N 的新令牌。

1
是的,这样做可以解决问题。但我不特别喜欢DelayQueue,因为它使用(通过PriortyQueue)平衡二叉哈希(这意味着在“offer”上有大量比较和可能的数组增长),而且对我来说有点沉重。我猜对于其他人来说,这可能完全没问题。 - vtrubnikov
5
在这个应用程序中,由于添加到堆中的新元素几乎总是堆中的最大元素(即具有最长延迟),因此通常每次添加只需要进行一次比较。此外,如果算法实现正确,则数组永远不会增长,因为仅在取出一个元素后才添加一个元素。 - erickson
3
我发现即使不想让请求出现大量突发,保持大小M和延迟N相对较小,以几毫秒为单位的情况下,这也很有帮助。例如,M = 5,N = 20毫秒将提供250次/秒的吞吐量,并使突发事件以5个请求为一组发生。 - FUD
这个方案能否支持百万级的每分钟请求数和允许并发请求?我需要添加一百万个delayedElements。同时,一些边缘情况下的延迟会很高 - 比如多个线程同时调用poll()方法时会导致每次都被锁住。 - Aditya Joshee
@AdityaJoshee 我没有对它进行基准测试,但如果我有时间,我会尝试了解一下开销。需要注意的一件事是,您不需要在1秒内过期的100万个令牌。您可以拥有100个在10毫秒内过期的令牌,10个在毫秒内过期的令牌等等。这实际上强制瞬时速率接近平均速率,平滑峰值,这可能会导致客户端出现备份,但这是速率限制的自然结果。 1百万RPM听起来并不像是限流。如果您能解释您的用例,我可能会有更好的想法。 - erickson

22

阅读关于令牌桶(Token bucket)算法的相关内容。基本上,您有一个带有令牌的桶,每次执行方法时就会消耗一个令牌。如果没有令牌了,就会阻塞直到获取令牌。同时,在固定时间间隔内,有某个外部参与者会不断补充令牌。

我不知道是否有库可以实现这样的功能(或类似的功能)。您可以将此逻辑编写到您的代码中,或使用AspectJ来添加此行为。


4
谢谢建议,算法很有趣。但它并不完全符合我的需求。例如,我需要将执行限制为每秒5次调用。如果我使用令牌桶,并且同时接收到10个请求,则前5个调用会获取所有可用的令牌并立即执行,而剩余的5个调用将在固定的1/5秒间隔后执行。在这种情况下,我需要在1秒钟后将剩余的5个调用一次性执行完毕。 - vtrubnikov
5
如果每秒往桶里添加5个代币(或者改为每1/5秒添加5 - (5-剩余数量)个代币),会发生什么? - Kevin
@Kevin,这仍然不能给我“滑动窗口”效果。 - vtrubnikov
2
@valery 是的,它会。 (但请记得将令牌限制在 M 以内) - nos
如果保留有关请求时间的元数据,一切都可以在单线程中完成,无需外部参与者。 - Marsellus Wallace

8
如果您需要一个基于Java的滑动窗口速率限制器,可以在分布式系统中运行,您可能需要查看https://github.com/mokies/ratelimitj项目。
一个Redis支持的配置,限制每个IP的请求为每分钟50个,将如下所示:
import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

请参阅https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis,了解有关Redis配置的更多详细信息。

5
这取决于应用程序。
假设多个线程想要一个令牌来执行某些全局速率限制的操作,而不允许突发情况发生(即您希望每10秒限制10个动作,但您不希望在第一秒发生10个动作,然后停止9秒)。
DelayedQueue的缺点是:线程请求令牌的顺序可能不是它们得到请求满足的顺序。如果有多个线程被阻塞等待令牌,则不清楚哪个线程将获得下一个可用令牌。在我看来,甚至可以让线程永远等待。
一种解决方案是在两个连续操作之间设置最小时间间隔,并按照请求的顺序执行操作。
以下是一种实现方法:
public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}

这里的 minTime 是什么意思?它是用来做什么的?你能解释一下吗? - flash
minTime 是在消耗一个令牌后需要等待的最短时间,才能够消耗下一个令牌。 - Duarte Meneses

5

我下面的实现可以处理任意请求时间精度,每个请求的时间复杂度为O(1),不需要任何额外的缓冲区,例如O(1) 的空间复杂度,而且它不需要后台线程来释放令牌,相反,令牌是根据自上次请求以来经过的时间来释放的。

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        lastTimeStamp = now;
        if (available<1)
            return false;
        else {
            available--;
            return true;
        }
    }
}

什么是“limit”? - steph643
@steph643,limit是速率限制阈值的值,即如果您想强制执行每秒5次的速率限制,则limit=5,interval=1s。 - tonywl
@tonywl 我认为你有一个bug - 当没有可用的令牌时,你没有更新lastTimeStamp,因此在有完整令牌之前,你调用得越频繁 - 你就会更快地累积令牌分数,而不管实际经过了多少时间。例如:我使用(2,1000)进行初始化,然后等待100ms并快速调用5次 - 即使只经过了略微超过100ms,第5次调用也将被允许。 - Guss
1
未来的人们 - 我上面的评论是在代码修复之前发表的。现在已经修复了。 - Guss

3
虽然这不是你要求的,但是ThreadPoolExecutor设计用于限制同时请求到M个而非N秒内的M个请求,也可能会有用。

2
我已经实现了一种简单的限流算法。请尝试此链接:http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html 关于这个算法,它利用了Java的Delayed Queue的能力。创建一个delayed对象,并设置期望的延迟时间(这里是1000/M毫秒TimeUnit)。将同样的对象放入延迟队列中,这将提供移动窗口。在每个方法调用之前,从队列中取出对象,take是一个阻塞调用,仅在指定延迟后返回,在方法调用之后不要忘记更新对象的时间并将其放回队列中(当前毫秒数)。
在这里,我们还可以使用具有不同延迟的多个延迟对象。这种方法也将提供高吞吐量。

6
你应该发布算法的摘要。如果你的链接失效,那么你的回答就毫无意义了。 - j.w.r
谢谢,我已经添加了简介。 - Devas

1

这是对上面的LeakyBucket代码的更新。 可以处理每秒超过1000个请求。

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

并且上述的单元测试:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}

这里的 minTimeNano 是什么意思?你能解释一下吗? - flash

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接