Java执行器带有限流/吞吐量控制

35
我正在寻找一个Java Executor,它能够指定限制节流/吞吐量/速率控制,例如,在一秒钟内最多只能处理100个任务--如果提交更多的任务,则它们应该被排队并在以后执行。主要目的是避免在访问外部API或服务器时达到限制。
我想知道是否基于Java本身(我怀疑,因为我已经检查过了),或者其他可靠的地方(例如Apache Commons)提供此功能,或者是否必须编写自己的代码。最好使用轻量级的框架。我不介意自己编写代码,但如果有“标准”版本存在,我至少想先看一下。
6个回答

41

看一下瓜娃子(RateLimiter):

速率限制器。在概念上,速率限制器按可配置的速率分发许可证。每次调用 acquire() 都会阻塞,直到有一个许可证可用,然后获取它。获取到许可证后,不需要释放。速率限制器通常用于限制对某个物理或逻辑资源的访问速率。这与 Semaphore 不同,后者限制并发访问数量而不是速率(尽管并发和速率密切相关,例如,参见 Little's Law)。

它是线程安全的,但仍然带有 @Beta 标志。 无论如何,值得一试。

您需要将每个调用包装在关于速率限制器的方面。 对于更清晰的解决方案,您可以为 ExecutorService 创建某种包装器。

来自 javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }

是的,谢谢,这正是我在寻找的。@Beta有点烦人,但还是很不错的。 - mrip
我对所提出的解决方案有一些担忧。在上面的例子中,速率限制器会将提交限制到执行器服务,但是线程池会应用自己的执行策略,并可能同时启动所有任务执行。 - pditommaso
@pditommaso 不会出现这种情况,因为我们不会一次性将任务提交给执行器。rateLimiter.acquire(); // 可能会等待,会使任务在提交给执行器之前等待。因此,只有“允许”的任务数量才会存在于执行器队列中。 - Rishi
但是,如果第一个任务每个都需要超过0.5秒的时间,执行器队列将会积累,并且可以以更高的速率执行(如果后续任务更快),对吗?因此,这并不能保证始终以每秒2次的最大速率运行,只是平均而言。 - Wonko

8
Java Executor并没有提供这样的限制,只有线程数量的限制,这不是您要寻找的。通常情况下,Executor不是限制此类操作的正确位置,应该在线程尝试调用外部服务器时进行限制。例如,您可以通过使用Semaphore来实现限制,线程在提交请求之前等待它。调用线程:
public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

同时,您需要安排一个(单个)次要线程定期释放已获取的资源(例如每60秒一次):

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }

1
谢谢你的回答。你提出了一个很好的观点,但我并不认为Executor是强加这些限制的错误位置。你所描述的逻辑是我设想的封装在执行器中的逻辑。既然我无论如何都要使用执行器,我认为能够编写类似new ThrottlingExecutor().setThrottlingRule(new MaxPerSecond(100))这样的代码会是一个相当干净的解决方案。 - mrip
我同意mrip的观点。如果你使用CompletableFuture,那么唯一可以实现这样速率限制的地方是在Executor内部。 - Gili

4

每秒最多只能处理100个任务--如果提交的任务超过100个,则应该将它们排队并稍后执行

您需要查看Executors.newFixedThreadPool(int limit)。这将允许您限制可以同时执行的线程数。如果提交多个线程,它们将被排队并稍后执行。

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

上面的片段展示了如何使用 ExecutorService 来控制同时执行的线程数量不超过 100 个。
更新: 在查看评论后,我想到了一个有点愚蠢的方法。我们可以手动记录要执行的线程,先将它们存储在 ArrayList 中,然后根据在最近一秒钟内已经执行的线程数量来提交它们给 Executor。 例如,如果已经向我们维护的 ArrayList 中提交了 200 个任务,则我们可以遍历,并将其中的 100 个添加到 Executor 中。当一秒钟过去时,我们可以根据 Executor 中已完成的线程数再添加一些线程,以此类推。

1
这并没有回答楼主的问题。他不是在问如何限制线程数,而是在问如何通过任何机制限制Executor处理任务的速率。 - chrylis -cautiouslyoptimistic-
1
感谢您的回答。对于我的目的,固定线程池的问题在于(a)我不需要很多线程,可能只需要一个,(b)它仍然不能限制每秒执行的次数。特别是在任务非常快速执行的情况下(例如向服务器请求一些数据)。无论是固定线程池还是Java的计划执行器似乎都没有提供这个功能,至少不是自然的方式。 - mrip
@mrip 如果我发了一些没有帮助到你的内容,我深感抱歉。很抱歉浪费了你的时间。请阅读更新并查看是否有所帮助。 - An SO User

1

根据情况,正如之前的回答所建议的那样,ThreadPoolExecutor的基本功能可能会有所帮助。

但是,如果线程池被多个客户端共享,并且您想要限制每个客户端的使用以确保一个客户端不会使用所有线程,则BoundedExecutor将起到作用。

更多细节可以在以下示例中找到:

http://jcip.net/listings/BoundedExecutor.java


0

个人认为这种情况非常有趣。在我的情况下,我想强调的是有趣的阻塞阶段是消费方面的,就像经典的生产者/消费者并发理论一样。这与之前提出的一些建议答案相反。也就是说,我们不想阻塞提交线程,而是基于速率(任务/秒)策略阻塞消费线程。因此,即使队列中有准备好的任务,执行/消费线程也可能会阻塞等待满足限制策略。

话虽如此,我认为一个很好的选择是Executors.newScheduledThreadPool(int corePoolSize)。这样,您需要在执行器前面放置一个简单的队列(一个简单的LinkedBlockingQueue就可以),然后安排定期任务从队列中获取实际任务(ScheduledExecutorService.scheduleAtFixedRate)。因此,这不是一个直接的解决方案,但如果您尝试像之前讨论的那样限制消费者,它应该表现得足够好。


0

可以在Runnable内部进行限制:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

JAVA 线程去抖动和节流中获取


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接