Java限流机制

3
更新:我使用的是Java 1.6.34版本,且无法升级到Java 7。

我有一个场景,只允许每分钟调用一个方法不超过80次。这实际上是由第三方编写的服务API,如果您调用次数太多,它会“关闭”(忽略调用)其API:

public class WidgetService {
    // Can only call this method 80x/min, otherwise it
    // it just doesn't do anything
    public void doSomething(Fizz fizz);
}

我想编写一个名为ApiThrottler的类,其中包含一个boolean canRun()方法,用于告诉我的Java客户端是否可以调用doSomething(Fizz)方法。 (当然,它始终可以被调用,但如果我们超过了速率,调用它是没有意义的。)
所以,这样的东西将允许我编写以下代码:
// 80x/min
ApiThrottler throttler = new ApiThrottler(80);

WidgetService widgetService = new DefaultWidgetService();

// Massive list of Fizzes
List<Fizz> fizzes = getFizzes();

for(Fizz fizz : fizzes)
    if(throttler.canRun())
        widgetService.doSomething(fizz);

这并不一定要使用API(ApiThrottler#canRun),但我需要一个可靠的机制来暂停/休眠,直到可以调用WidgetService#doSomething(Fizz)

这让我感觉我们正在进入使用多个线程的领域,这让我感觉我们可以使用某种锁定机制和Java通知(wait()/notify())模型。但是由于没有在这个领域的经验,我无法想出最优雅的解决方案。提前致谢。

4个回答

5

可能最好的选择之一是使用Semaphore类http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html,并在每分钟内给它80个许可。这可以通过使用计时器类http://docs.oracle.com/javase/7/docs/api/java/util/Timer.html来实现。

调用线程将通过在信号量上调用acquire()消耗许可证,如果所有许可证都已耗尽,则该调用将被阻止。

当然,也可以使用计时器或单独的线程、wait/notify和整数计数器编写代码,但与我上面概述的更现代的java.util.concurrent API相比,这将更加复杂。

代码可能如下所示:

class Throttler implements TimerTask {
  final Semaphore s = new Semaphore(80);  
  final Timer timer = new Timer(true);

  Throttler() {
    timer.schedule(this, 0, 60*1000);   //schedule this for 1 min execution  
  }

  run() {  //called by timer 
    s.release(80 - s.availablePermits());
  }

  makeCall() {
    s.acquire();
    doMakeCall();
  }

}

这应该从Java 5开始就可以工作。

另外,更好的解决方案是使用Guava中的com.google.common.util.concurrent.RateLimiter。它可以像这样:

class Throttler {
  final RateLimiter rateLimiter = RateLimiter.create(80.0/60.0);

  makeCall() {
    rateLimiter.acquire();
    doMakeCall();
  }
}

语义学与信号量解决方案略有不同,而RateLimiter很可能更适合您的情况。

谢谢@hgrey (+1) - 有没有可能看到一些伪代码?我很难想象调用线程会是什么样子,以及它如何与计时器线程通信。再次感谢! - IAmYourFaja
太好了! 再次感谢 - 只有一个最后的问题:我假设doMakeCall是我将调用widgetService.doSomething(Fizz)的地方?我需要使用哪些Java类来安排1分钟执行的Caller?非常感谢到目前为止提供的所有帮助! - IAmYourFaja
2
不要试图耗尽所有许可证,然后释放80,你可能只想释放80 - s.availablePermits()并避免竞争许可证。 - bowmore

1

我最近写了类似这样的东西。唯一的变化是我的代码在函数运行完成后期望一个回调。所以如果它无法运行,我直接调用回调。

另一个变化是,由于这个调用可能是异步的,当第二次调用时,它可能仍在进行中。在这种情况下,我只忽略了这个调用。

我的节流器有一个名为call的辅助函数,它接受要调用的函数和回调。这是在C++中实现的,对于你来说,它将采用Action和listener接口的形式。

与基于信号量的解决方案相比,这具有优势,因为它可以序列化请求,使您不会过于频繁地调用它

interface Callback{
    public void OnFunctionCalled();
}

class APIThrottler
   //ctor etc
   boolean CanCall();

   public boolean IsInProgress();

   public void SetInProgress(boolean inProgress = true);

   public void Mark(){/*increment counter*/; SetInProgress(false);} // couldnt think of a better name..

   public void Call(Callable event, Callback callback){
        If(IsInProgress())
            return;
        else if(CanCall())
        {
            SetInProgress();
            event.Call();
        }
        else
            callback.OnFunctionCalled();

    }
}

一旦函数回调(或者如果它是同步的,在函数本身内部),您需要在完成后Mark()

这基本上是我的实现,唯一的区别是我处理的是每x秒(或分钟)一次。


谢谢@Karthik T (+1) - 你能更新你的答案并提供一些伪代码,这样我就能看到API的外观了吗?再次感谢! - IAmYourFaja
我看到了2个+1,但只有一个实际的赞:P。让我看看能否写出一些伪代码。提前原谅我相对不熟悉Java的情况。 - Karthik T
@DirtyMikeAndTheBoys 我已经更新了一些框架代码。当函数回调时,您需要执行 Mark() - Karthik T

0

这可以通过使用一个简单的计数器来实现。我们称之为“许可”计数器,初始值为80。在调用“函数”之前,首先尝试获取许可。完成后,请释放它。然后设置一个重复定时器,每秒将计数器重置为80。

class Permit extends TimerTask
{
    private Integer permit = 80;

    private synchronized void resetPermit() { this.permit = 80; }

    public synchronized boolean acquire() {
        if(this.permit == 0) return false;
        this.permit--;
        return true;
    }

    public synchronized void release() { this.permit++; }

    @Override public void run() { resetPermit(); }
}

设置定时器以每秒重置许可,如下所示。schedule方法的第一个参数是TimerTask的实例(传递上面的Permit类对象)。run()方法将为第二个参数指定的每个时间段调用一次(这里是1000毫秒/1秒)。'true'参数表示这是一个守护进程定时器,将不断重复执行。
Timer timer = new Timer(true);
timer.schedule(permit, 1000);

每次需要调用函数时,首先检查是否可以获取任何许可。完成后不要忘记释放它。
void myFunction() {
  if(!permit.acquire()) {
      System.out.println("Nah.. been calling this 80x in the past 1 sec");
      return;
  }

  // do stuff here

  permit.release();
}

请注意上面在Permit类方法中使用synchronized关键字的用法--这应该避免多个线程同时执行同一对象实例的任何方法。

0

您可以记录时间,并确保在最近一分钟内不超过80个记录。

// first=newest, last=oldest
final LinkedList<Long> records = new LinkedList<>();

synchronized public void canRun() throws InterruptedException
{
    while(true)
    {
        long now = System.currentTimeMillis();
        long oneMinuteAgo = now - 60000;

        // remove records older than one minute
        while(records.getLast() < oneMinuteAgo)
            records.removeLast();

        if(records.size()<80) // less than 80 records in the last minute
        {
            records.addFirst(now);
            return;   // can run
        }

        // wait for the oldest record to expire, then check again
        wait( 1 + records.getLast() - oneMinuteAgo);
    }
}

在第0秒,我们可能会发出80个调用,等待一分钟,然后在第60秒时再发出另外80个调用。由于两端的时钟不准确或网络随机延迟,对方可能会在一分钟内测量到160个调用。为了安全起见,扩大时间窗口,例如每70秒限制80个调用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接