在N秒内限制方法调用为M个请求

167

我需要一个组件/类,将某个方法的执行速率限制为每N秒(或毫秒或纳秒)最多M次。

换句话说,我需要确保在一个滑动窗口内,该方法的执行不超过M次。

如果您不知道现有的类,请随意发布您实现这一要求的解决方案/想法。


4
这个问题在 https://dev59.com/D3RB5IYBdhLWcg3wSVm1 有一些很好的回答。 - skaffman
原问题听起来很像这篇博客中解决的问题:Java 多通道异步限流器。对于在 N 秒内进行 M 次调用的速率,此博客中讨论的限流器保证时间线上任何长度为 N 的时间区间都不会包含超过 M 次的调用。 - Hbf
https://dev59.com/RljUa4cB1Zd3GeqPNQIn - kervin
此外,同一人已经改进了他的实现,以保证调用顺序的保留:http://www.cordinc.com/blog/2010/06/ordered-java-multichannel-asyn.html - ikaerom
当在分布式系统中需要锁定时,您可以使用Redis。请参考https://redis.io/commands/incr中的第二个算法。 - Kanagavelu Sugumar
显示剩余3条评论
15个回答

0
尝试使用这种简单的方法:
public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}


0

Apache Camel 还支持以下 Throttler 机制:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");

0
这是一个基于@tonywl的速率限制器实现(与Duarte Meneses的漏桶有些关联)。思路是一样的 - 使用“令牌池”来实现速率限制和突发调用(在空闲一段时间后,在短时间内进行多次调用)。
这个实现有两个主要区别:
1. 使用原子操作进行无锁并发访问。 2. 不是阻塞请求,而是计算需要强制执行速率限制的延迟,并将其作为响应提供,允许调用者强制执行延迟 - 这在现代网络框架中的异步编程中效果更好。
完整的实现和文档可以在this Github Gist找到,我也会在那里发布更新,但以下是要点:
import java.util.concurrent.atomic.AtomicLong;

public class RateLimiter {
    private final static long TOKEN_SIZE = 1_000_000 /* tockins per token */;
    private final double tokenRate; // measured in tokens per ms
    private final double tockinRate; // measured in tockins per ms
    private final long tockinsLimit;
    
    private AtomicLong available;
    private AtomicLong lastTimeStamp;

    /**
     * Create a new rate limiter with the token fill rate specified as
     * {@code fill}/{@code interval} and a maximum token pool size of {@code limit}, starting
     * with a {@code prefill} amount of tokens ready to be used.
     * @param prefill instead of starting with an empty pool, assume we "start from rest" and
     *                have tokens to consume. This value is clamped to {@code limit}.
     * @param limit The maximum number of tokens in the pool (burst size)
     * @param fill How many tokens will be filled in the pool by waiting {@code interval} time
     * @param interval How long will it take to get {@code fill} tokens back in the pool in ms
     */
    public RateLimiter(int prefill, int limit, int fill, long interval) {
        this.tokenRate = (double)fill / interval;
        this.tockinsLimit = TOKEN_SIZE * limit;
        this.tockinRate = tokenRate * TOKEN_SIZE;
        this.lastTimeStamp = new AtomicLong(System.nanoTime());
        this.available  = new AtomicLong(Math.max(prefill, limit) * TOKEN_SIZE);
    }

    public boolean allowRequest() {
        return whenNextAllowed(1, false) == 0;
    }

    public boolean allowRequest(int cost) {
        return whenNextAllowed(cost, false) == 0;
    }

    public long whenNextAllowed(boolean alwaysConsume) {
        return whenNextAllowed(1, alwaysConsume);
    }
    
    /**
     * Check when will the next call be allowed, according to the specified rate.
     * The value returned is in milliseconds. If the result is 0 - or if {@code alwaysConsume} was
     * specified then the RateLimiter has recorded that the call has been allowed.
     * @param cost How costly is the requested action. The base rate is 1 token per request,
     *   but the client can declare a more costly action that consumes more tokens.
     * @param alwaysConsume if set to {@code true} this method assumes that the caller will delay
     *   the action that is rate limited but will perform it without checking again - so it will
     *   consume the specified number of tokens as if the action has gone through. This means that
     *   the pool can get into a deficit, which will further delay additional actions.
     * @return how many milliseconds before this request should be let through.
     */
    public long whenNextAllowed(int cost, boolean alwaysConsume) {
        var now = System.nanoTime();
        var last = lastTimeStamp.getAndSet(now);
        // calculate how many tockins we got since last call
        // if the previous call was less than a microsecond ago, we still accumulate at least
        // one tockin, which is probably more than we should, but this is too small to matter - right?
        var add = (long)Math.ceil(tokenRate * (now - last));
        var nowAvailable = available.addAndGet(add);
        while (nowAvailable > tockinsLimit) {
            available.compareAndSet(nowAvailable, tockinsLimit);
            nowAvailable = available.get();
        }

        // answer the question
        var toWait = (long)Math.ceil(Math.max(0, (TOKEN_SIZE - nowAvailable) / tockinRate));
        if (alwaysConsume || toWait == 0) // the caller will let the request go through, so consume a token now
            available.addAndGet(-TOKEN_SIZE);
        return toWait;
    }

}

0
这是一个简单速率限制器的小高级版本。
/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

还有单元测试

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}

代码非常简单。只需使用maxRate或periods和rate创建限制器,然后在每个请求中调用consume即可。每当速率没有超过限制时,限制器会立即返回,或者在返回以降低当前请求速率之前等待一段时间。 它还有一个current rate方法,返回当前速率的滑动平均值。 - Leonid Astakhov

0
我的解决方案:一个简单的工具方法,你可以修改它来创建一个包装类。
public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

参考自JAVA线程去抖动和节流


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接