使用方法:
public static void main(String[] args) throws InterruptedException {
RateFriendlyThreadPoolExecutor executor = new RateFriendlyThreadPoolExecutor(3, 5, 1, SECONDS, new LinkedBlockingDeque<>(100));
executor.setRate(10);
executor.setMinRate(1);
executor.setMaxRate(100);
for (int i = 0; i < 1000; i++) {
int lap = i;
executor.execute(() -> System.out.printf("%03d (%s) - %s - %s%n", lap, executor.getRate(), LocalDateTime.now(), Thread.currentThread().getName()));
}
executor.shutdown();
executor.awaitTermination(60, SECONDS);
}
输出:
002 (10) - 2023-05-27T23:03:37.659658800 - pool-1-thread-3
000 (11) - 2023-05-27T23:03:37.659658800 - pool-1-thread-1
001 (11) - 2023-05-27T23:03:37.744859100 - pool-1-thread-2
105 (11) - 2023-05-27T23:03:37.930152500 - main
103 (12) - 2023-05-27T23:03:38.037876400 - pool-1-thread-4
104 (12) - 2023-05-27T23:03:38.130058800 - pool-1-thread-5
003 (12) - 2023-05-27T23:03:38.221655300 - pool-1-thread-3
004 (12) - 2023-05-27T23:03:38.314020700 - pool-1-thread-1
005 (12) - 2023-05-27T23:03:38.406202700 - pool-1-thread-2
006 (12) - 2023-05-27T23:03:38.573508200 - pool-1-thread-4
007 (13) - 2023-05-27T23:03:38.665875900 - pool-1-thread-5
008 (13) - 2023-05-27T23:03:38.742695200 - pool-1-thread-3
实施:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import static java.lang.Math.min;
import static java.lang.System.nanoTime;
public class RateFriendlyThreadPoolExecutor extends ThreadPoolExecutor {
private AtomicInteger rate = new AtomicInteger();
private AtomicInteger minRate = new AtomicInteger();
private AtomicInteger maxRate = new AtomicInteger();
private AtomicLong leapTime = new AtomicLong();
private ReentrantLock rateLock = new ReentrantLock();
public RateFriendlyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, (r, e) -> overflow(r, (RateFriendlyThreadPoolExecutor) e));
}
@Override
public void execute(Runnable rannable) {
super.execute(() -> executeWithDelay(rannable));
}
protected void executeWithDelay(Runnable runnable) {
int rateSnapshot = rate.get();
limitRate(rateSnapshot, leapTime, rateLock);
try {
runnable.run();
rate.compareAndSet(rateSnapshot, min(rateSnapshot + 1, maxRate.get()));
} catch (Exception e) {
if (!isThrottled(e))
throw e;
System.out.println("throttled at rate " + rateSnapshot);
rate.set(minRate.get());
execute(runnable);
}
}
public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
if (rate == 0)
return;
long targetLeapTime = 1_000_000_000 / rate;
rateLock.lock();
try {
long timeSnapshot = nanoTime();
long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
if (waitTime > 0) {
if (waitTime > 1_000_000)
LockSupport.parkNanos(waitTime);
else
while (timeSnapshot + waitTime > nanoTime())
;
leapTime.set(timeSnapshot + waitTime);
} else {
leapTime.set(timeSnapshot);
}
} finally {
rateLock.unlock();
}
}
private static void overflow(Runnable r, RateFriendlyThreadPoolExecutor e) {
if (!e.isShutdown())
e.executeWithDelay(r);
}
private boolean isThrottled(Exception e) {
return e.getMessage().contains("Reduce your rate");
}
public AtomicInteger getRate() {
return rate;
}
public void setRate(int rate) {
this.rate.set(rate);
minRate.compareAndSet(0, rate);
maxRate.compareAndSet(0, rate);
}
public AtomicInteger getMinRate() {
return minRate;
}
public void setMinRate(int minRate) {
this.minRate.set(minRate);
}
public AtomicInteger getMaxRate() {
return maxRate;
}
public void setMaxRate(int maxRate) {
this.maxRate.set(maxRate);
}
}