非阻塞的速率限制线程池执行器

9
我正在使用多个连接同时访问HTTP服务器。当服务器指示请求过于频繁时,我想要限制客户端的速度。我不希望更改我正在使用的HTTP库,而是想要扩展它。
为此,如何实现具有以下约束条件的ThreadPoolExecutor?
  • 执行程序具有可选的速率限制。
  • 当禁用速率限制时,它会尽可能快地执行任务(ThreadPoolExecutor的正常行为)。
  • 启用速率限制时,它最多可以每秒执行N个任务。
  • 速率限制适用于所有执行程序线程,而不是每个线程。
  • 不允许突发。也就是说,如果限制是每秒10个请求,我希望每100毫秒开始一个请求。我不希望所有线程一起触发,然后在剩下的时间里保持空闲。
  • 速率限制是动态的。如果请求失败,则速率降低。如果请求成功,则速率增加。
  • 当没有任务准备好执行时(考虑速率限制),线程被视为空闲状态。也就是说,我希望ThreadPoolExecutor将这些线程标记为空闲状态,并在适当时将它们停止,而不是阻塞线程直到达到速率限制。反过来,一旦到达执行下一个任务的时间,线程应该重新启动。

我所了解的


我猜那些给你的原帖点踩的人可能是因为你没有提供自己努力的证据。但是对于这个算法复杂的任务,我几乎看不出有什么样的证据可以提供。祝你好运找到解决方案。 - Oleg Sklyar
元评论在问题中没有位置,不要再编辑回去。问题的目的是为了超越你特定的需求,让未来的访问者受益,并且未来的访问者并不关心这里的过程。 - Martijn Pieters
@OlegSklyar,FYI,我已经把它搞定了。答案如下。 - Gili
@Gili 干得好! - Oleg Sklyar
2个回答

5

回答我的问题:

  • 没有完全不阻塞的解决方案。即使 ScheduledThreadPoolExecutor 也会保留至少一个线程等待队列返回一个新任务。
  • ThreadPoolExecutor 坐在一个 BlockingQueue 上。当没有剩余任务时,它会阻塞在 BlockingQueue.take() 上。
  • 该解决方案有三个移动部分:
  1. 速率限制器。
  2. 一个 BlockingQueue,它隐藏元素直到速率限制器允许它们被消耗。
  3. 坐在 BlockingQueue 顶部的 ThreadPoolExecutor

速率限制器

我提供了自己的速率限制器,基于令牌桶算法来克服 RateLimiter限制。源代码可以在这里找到。


BlockingQueue

我实现了一个 BlockingDeque(扩展了 BlockingQueue),因为将来我想尝试将失败的任务推回队列前面。

RateLimitedBlockingDeque.java

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static org.bitbucket.cowwoc.requirements.core.Requirements.requireThat;

/**
 * A blocking deque of elements, in which an element can only be taken when the deque-wide delay has expired.
 * <p>
 * The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if
 * unspecified, is equal to {@link Integer#MAX_VALUE}.
 * <p>
 * Even though methods that take elements, such as {@code take} or {@code poll}, respect the deque-wide delay the
 * remaining methods treat them as normal elements. For example, the {@code size} method returns the count of both
 * expired and unexpired elements.
 * <p>
 * This class and its iterator implement all of the <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * @param <E> the type of elements in the deque
 * @author Gili Tzabari
 */
public final class RateLimitedBlockingDeque<E> implements BlockingDeque<E>
{
    private final int capacity;
    private final LinkedBlockingDeque<E> delegate;
    private final Bucket rateLimit = new Bucket();

    /**
     * Creates a {@code RateLimitedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
     */
    public RateLimitedBlockingDeque()
    {
        this.capacity = Integer.MAX_VALUE;
        this.delegate = new LinkedBlockingDeque<>();
    }

    /**
     * Creates a {@code RateLimitedBlockingDeque} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this deque
     * @throws IllegalArgumentException if {@code capacity} is less than 1
     */
    public RateLimitedBlockingDeque(int capacity)
    {
        this.capacity = capacity;
        this.delegate = new LinkedBlockingDeque<>(capacity);
    }

    /**
     * @return the capacity of the deque
     */
    public int getCapacity()
    {
        return capacity;
    }

    /**
     * Indicates the rate at which elements may be taken from the queue.
     *
     * @param elements the number of elements that may be taken per {@code period}
     * @param period   indicates how often elements may be taken
     * @throws NullPointerException     if {@code period} is null
     * @throws IllegalArgumentException if the requested rate is greater than element per nanosecond
     */
    public void setRate(long elements, Duration period)
    {
        synchronized (rateLimit)
        {
            Limit newLimit = new Limit(elements, period, 0, Long.MAX_VALUE);
            if (rateLimit.getLimits().isEmpty())
                rateLimit.addLimit(newLimit);
            else
            {
                Limit oldLimit = rateLimit.getLimits().iterator().next();
                rateLimit.replaceLimit(oldLimit, newLimit);
            }
        }
    }

    /**
     * Allows consumption of elements without limit.
     */
    public void removeRate()
    {
        synchronized (rateLimit)
        {
            rateLimit.removeAllLimits();
        }
    }

    @Override
    public void addFirst(E e)
    {
        delegate.addFirst(e);
    }

    @Override
    public void addLast(E e)
    {
        delegate.addLast(e);
    }

    @Override
    public boolean offerFirst(E e)
    {
        return delegate.offerFirst(e);
    }

    @Override
    public boolean offerLast(E e)
    {
        return delegate.offerLast(e);
    }

    @Override
    public void putFirst(E e) throws InterruptedException
    {
        delegate.putFirst(e);
    }

    @Override
    public void putLast(E e) throws InterruptedException
    {
        delegate.putLast(e);
    }

    @Override
    public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offerFirst(e, timeout, unit);
    }

    @Override
    public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offerLast(e, timeout, unit);
    }

    @Override
    public E removeFirst()
    {
        if (rateLimit.tryConsume())
            return delegate.removeFirst();
        throw new NoSuchElementException();
    }

    @Override
    public E removeLast()
    {
        if (rateLimit.tryConsume())
            return delegate.removeLast();
        throw new NoSuchElementException();
    }

    @Override
    public E pollFirst()
    {
        if (rateLimit.tryConsume())
            return delegate.pollFirst();
        return null;
    }

    @Override
    public E pollLast()
    {
        if (rateLimit.tryConsume())
            return delegate.pollLast();
        return null;
    }

    @Override
    public E takeFirst() throws InterruptedException
    {
        rateLimit.consume();
        return delegate.takeFirst();
    }

    @Override
    public E takeLast() throws InterruptedException
    {
        rateLimit.consume();
        return delegate.takeLast();
    }

    @Override
    public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException
    {
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollFirst(timeout, unit);
        return null;
    }

    @Override
    public E pollLast(long timeout, TimeUnit unit) throws InterruptedException
    {
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollLast(timeout, unit);
        return null;
    }

    @Override
    public E getFirst()
    {
        return delegate.getFirst();
    }

    @Override
    public E getLast()
    {
        return delegate.getLast();
    }

    @Override
    public E peekFirst()
    {
        return delegate.peekFirst();
    }

    @Override
    public E peekLast()
    {
        return delegate.peekLast();
    }

    @Override
    public boolean removeFirstOccurrence(Object o)
    {
        return delegate.removeFirstOccurrence(o);
    }

    @Override
    public boolean removeLastOccurrence(Object o)
    {
        return delegate.removeLastOccurrence(o);
    }

    @Override
    public boolean add(E e)
    {
        return delegate.add(e);
    }

    @Override
    public boolean offer(E e)
    {
        return delegate.offer(e);
    }

    @Override
    public void put(E e) throws InterruptedException
    {
        putLast(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offer(e, timeout, unit);
    }

    @Override
    public E remove()
    {
        return removeFirst();
    }

    @Override
    public E poll()
    {
        return pollFirst();
    }

    @Override
    public E take() throws InterruptedException
    {
        return takeFirst();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException
    {
        return pollFirst(timeout, unit);
    }

    @Override
    public E element()
    {
        return getFirst();
    }

    @Override
    public E peek()
    {
        return peekFirst();
    }

    @Override
    public int remainingCapacity()
    {
        return delegate.remainingCapacity();
    }

    @Override
    public int drainTo(Collection<? super E> c)
    {
        int result = 0;
        while (true)
        {
            E next = pollFirst();
            if (next == null)
                break;
            c.add(next);
        }
        return result;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements)
    {
        int result = 0;
        do
        {
            E next = pollFirst();
            if (next == null)
                break;
            c.add(next);
        }
        while (result < maxElements);
        return result;
    }

    @Override
    public void push(E e)
    {
        addFirst(e);
    }

    @Override
    public E pop()
    {
        return removeFirst();
    }

    @Override
    public boolean remove(Object o)
    {
        return removeFirstOccurrence(o);
    }

    @Override
    public int size()
    {
        return delegate.size();
    }

    @Override
    public boolean contains(Object o)
    {
        return delegate.contains(o);
    }

    @Override
    public Object[] toArray()
    {
        return delegate.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a)
    {
        return delegate.toArray(a);
    }

    @Override
    public String toString()
    {
        return delegate.toString();
    }

    @Override
    public void clear()
    {
        delegate.clear();
    }

    @Override
    public Iterator<E> iterator()
    {
        return wrap(delegate.iterator());
    }

    /**
     * @param delegateIterator the iterator to delegate to
     * @return an iterator that respects the rate-limit
     */
    private Iterator<E> wrap(Iterator<E> delegateIterator)
    {
        return new Iterator<E>()
        {
            private E previousElement = null;

            @Override
            public boolean hasNext()
            {
                return delegateIterator.hasNext();
            }

            @Override
            public E next()
            {
                return delegateIterator.next();
            }

            @Override
            public void remove()
            {
                if (previousElement == null)
                    throw new IllegalStateException("next() not invoked, or remove() already invoked");
                try
                {
                    rateLimit.consume();
                }
                catch (InterruptedException e)
                {
                    throw new IllegalStateException(e);
                }
                delegateIterator.remove();
                previousElement = null;
            }
        };
    }

    @Override
    public Iterator<E> descendingIterator()
    {
        return wrap(delegate.descendingIterator());
    }

    @Override
    public boolean addAll(Collection<? extends E> c)
    {
        requireThat("c", c).isNotNull().isNotEqualTo("this", this);
        boolean modified = false;
        for (E e: c)
            if (add(e))
                modified = true;
        return modified;
    }

    @Override
    public boolean isEmpty()
    {
        return delegate.isEmpty();
    }

    @Override
    public boolean containsAll(Collection<?> c)
    {
        return delegate.containsAll(c);
    }

    @Override
    public boolean removeAll(Collection<?> c)
    {
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
        {
            E element = i.next();
            if (c.contains(element))
            {
                i.remove();
                modified = true;
            }
        }
        return modified;
    }

    @Override
    public boolean retainAll(Collection<?> c)
    {
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
        {
            E element = i.next();
            if (!c.contains(element))
            {
                i.remove();
                modified = true;
            }
        }
        return modified;
    }

    @Override
    public int hashCode()
    {
        return delegate.hashCode();
    }

    @Override
    public boolean equals(Object obj)
    {
        return delegate.equals(obj);
    }
}

源代码链接显示 404 找不到页面。如果可能的话,你能分享一下吗? - thaveethu gce
谢谢你提供的代码。我有一个疑问。由于你没有使用任何外部持久队列,比如Redis或JMS,这个程序能够承受大量请求吗?例如,如果需要处理100个请求每秒,但是实际进来的请求却是10万个每秒,那么会发生什么情况呢? - thaveethu gce
@thaveethugce Bucket是线程安全的,因此每次只有一个线程获取令牌,其他所有线程都会在互斥锁上等待或等待下一个可用令牌。您可以从JMH中调用Bucket.consume()来计算您的机器每秒能够消耗的平均令牌数。这应该相当高效。 - Gili
谢谢您花时间向我解释基础知识。很抱歉我仍然不是很清楚。如果有更多的请求涌入会发生什么?在某个时刻我们需要拒绝请求吗?有多少个请求可以等待获取令牌?更多的线程等待会导致内存溢出和请求被拒绝吗?假设速率限制中允许的每秒请求数(QPS)为1000,但实际进入的请求非常庞大,如每秒百万次。 - thaveethu gce
@thaveethugce 这取决于你正在查看哪个类。Bucket没有任何限制。尝试获取令牌将阻塞线程,直到轮到你为止。如果你在谈论 RateLimitedBlockingDequeue,那么你可以从 Javadoc 中看到它有一个相应的 capacity。如果你尝试将更多任务推入 deque 中,超过了 capacity 允许的范围,那么你将得到由 BlockingDeque.add() 指定的行为(即会抛出 IllegalStateException)。 - Gili

0

使用方法:

public static void main(String[] args) throws InterruptedException {
    RateFriendlyThreadPoolExecutor executor = new RateFriendlyThreadPoolExecutor(3, 5, 1, SECONDS, new LinkedBlockingDeque<>(100));
    executor.setRate(10);
    executor.setMinRate(1);
    executor.setMaxRate(100);

    for (int i = 0; i < 1000; i++) {
        int lap = i;
        executor.execute(() -> System.out.printf("%03d (%s) - %s - %s%n", lap, executor.getRate(), LocalDateTime.now(), Thread.currentThread().getName()));
    }

    executor.shutdown();
    executor.awaitTermination(60, SECONDS);
}

输出:

002 (10) - 2023-05-27T23:03:37.659658800 - pool-1-thread-3
000 (11) - 2023-05-27T23:03:37.659658800 - pool-1-thread-1
001 (11) - 2023-05-27T23:03:37.744859100 - pool-1-thread-2
105 (11) - 2023-05-27T23:03:37.930152500 - main
103 (12) - 2023-05-27T23:03:38.037876400 - pool-1-thread-4
104 (12) - 2023-05-27T23:03:38.130058800 - pool-1-thread-5
003 (12) - 2023-05-27T23:03:38.221655300 - pool-1-thread-3
004 (12) - 2023-05-27T23:03:38.314020700 - pool-1-thread-1
005 (12) - 2023-05-27T23:03:38.406202700 - pool-1-thread-2
006 (12) - 2023-05-27T23:03:38.573508200 - pool-1-thread-4
007 (13) - 2023-05-27T23:03:38.665875900 - pool-1-thread-5
008 (13) - 2023-05-27T23:03:38.742695200 - pool-1-thread-3

实施:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Math.min;
import static java.lang.System.nanoTime;

public class RateFriendlyThreadPoolExecutor extends ThreadPoolExecutor {

    private AtomicInteger rate = new AtomicInteger();
    private AtomicInteger minRate = new AtomicInteger();
    private AtomicInteger maxRate = new AtomicInteger();
    private AtomicLong leapTime = new AtomicLong();
    private ReentrantLock rateLock = new ReentrantLock();

    public RateFriendlyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, (r, e) -> overflow(r, (RateFriendlyThreadPoolExecutor) e));
    }

    @Override
    public void execute(Runnable rannable) {
        super.execute(() -> executeWithDelay(rannable));
    }

    protected void executeWithDelay(Runnable runnable) {
        int rateSnapshot = rate.get();
        limitRate(rateSnapshot, leapTime, rateLock);
        try {

            runnable.run();

            rate.compareAndSet(rateSnapshot, min(rateSnapshot + 1, maxRate.get()));
        } catch (Exception e) {
            if (!isThrottled(e))
                throw e;
            System.out.println("throttled at rate " + rateSnapshot);
            rate.set(minRate.get());
            execute(runnable);
        }
    }

    // works for parallel streams like a charm
    public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
        if (rate == 0)
            return;
        long targetLeapTime = 1_000_000_000 / rate;
        rateLock.lock();
        try {
            long timeSnapshot = nanoTime();
            long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
            if (waitTime > 0) {

                if (waitTime > 1_000_000)
                    LockSupport.parkNanos(waitTime);
                else
                    while (timeSnapshot + waitTime > nanoTime())
                        /* busy wait */;

                leapTime.set(timeSnapshot + waitTime);
            } else {
                leapTime.set(timeSnapshot);
            }
        } finally {
            rateLock.unlock();
        }
    }

    private static void overflow(Runnable r, RateFriendlyThreadPoolExecutor e) {
        if (!e.isShutdown())
            e.executeWithDelay(r);
    }

    private boolean isThrottled(Exception e) {
        return e.getMessage().contains("Reduce your rate");
    }

    public AtomicInteger getRate() {
        return rate;
    }

    public void setRate(int rate) {
        this.rate.set(rate);
        minRate.compareAndSet(0, rate);
        maxRate.compareAndSet(0, rate);
    }

    public AtomicInteger getMinRate() {
        return minRate;
    }

    public void setMinRate(int minRate) {
        this.minRate.set(minRate);
    }

    public AtomicInteger getMaxRate() {
        return maxRate;
    }

    public void setMaxRate(int maxRate) {
        this.maxRate.set(maxRate);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接