支持多线程的异步任务队列限流实现

12

我需要实现一个请求vk.com API的库。问题是API每秒只支持3个请求。我希望API是异步的。

重要提示:API应该支持从多个线程安全访问。

我的想法是实现一个名为throttler的类,它允许每秒不超过3个请求并延迟其他请求。

接口如下:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}

使用方法如下:

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

如何实现节流器(throttler)?

目前我将其实现为一个队列,由后台线程处理。

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}
这是处理队列的后台线程循环的缩短代码:
private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

是否可能不使用后台线程来实现这个功能?


1
为什么不创建一个TaskScheduler的实现 https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx,它监视任务以将其限制为每秒3个?框架会处理其余部分。 - user1228
@Will 我考虑过这个问题,但不知道如何将所有线程的 API 调用绑定到 TaskScheduler 上? - STO
@Servy【需要引用来源】 - user1228
@Servy 只适用于 CPU 绑定型工作?再次强调,需要引用证明。 - user1228
1
@Will 很遗憾,自定义的 TaskScheduler 不能作为解决此问题的方案,因为接受 scheduler 参数的 Task.Factory.StartNew 方法不理解 async 委托。这里是我对此主题的研究。 - Theodor Zoulias
显示剩余4条评论
5个回答

14

我们将从一个简单问题的解决方案开始,即创建一个队列,可以同时处理最多N个任务,而不是每秒启动N个任务,并在此基础上进行改进:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

我们还将使用以下辅助方法来匹配TaskCompletionSource的结果到一个`Task`:
public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

现在我们来讲解实际的解决方案。每次需要执行限流操作时,我们可以创建一个TaskCompletionSource,然后进入我们的TaskQueue并添加一个启动任务的项,将TCS与其结果匹配,不要等待它,然后延迟任务队列1秒钟。任务队列将不允许任务开始,直到过去一秒钟内没有启动N个任务,而操作本身的结果与创建Task相同:
public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}

谢谢,看起来不错,但是 Task.Delay(TimeSpan.FromSeconds(1)) 不会与另外两个任务 并行执行 而被延迟吗? - STO
@STO 是的,这就是重点。它只会与最多两个其他任务并行完成。在过去的一秒钟内,如果已经启动了三个任务,则不允许第四个、第五个等进入延迟。一旦这三个延迟中最老的一个已经过去,接下来的延迟就会到来,现在有一个新的最老的一秒延迟,当完成时,另一个任务就可以进入,等等,给你一个滚动的一秒窗口。 - Servy
明白了,接受作为答案。有一点是我们不需要总是延迟一秒钟,如果任务提前完成,我们应该延迟“剩余的时间”。 - STO
2
@STO 不,重要的是它被使用。这就是整个重点。队列只关心在开始给定操作后的1秒延迟。它根本不关心操作本身实际完成的时间。这是你的前提。因此,它确保您不能启动任务,只要过去一秒钟内已经启动了N个任务,并让下一个任务在自最旧的任务开始后1秒钟立即启动。 - Servy
1
@Servy,您之前发布过这个TaskQueue,但是我好像漏掉了什么。当我添加任务时,它从未被执行。您是否有此代码的可用示例? - Arnold Wiersma
显示剩余2条评论

4

我使用SemaphoreSlim的包装器解决了类似的问题。在我的情况下,我还有一些其他的限流机制,并且我需要确保即使请求1比请求3到达API的时间更长,请求也不会太频繁地命中外部API。我的解决方案是使用一个包装器来释放调用者,但实际的SemaphoreSlim只有在经过一定时间后才会被释放。

public class TimeGatedSemaphore
{
    private readonly SemaphoreSlim semaphore;
    public TimeGatedSemaphore(int maxRequest, TimeSpan minimumHoldTime)
    {
        semaphore = new SemaphoreSlim(maxRequest);
        MinimumHoldTime = minimumHoldTime;
    }
    public TimeSpan MinimumHoldTime { get; }

    public async Task<IDisposable> WaitAsync()
    {
        await semaphore.WaitAsync();
        return new InternalReleaser(semaphore, Task.Delay(MinimumHoldTime));
    }

    private class InternalReleaser : IDisposable
    {
        private readonly SemaphoreSlim semaphoreToRelease;
        private readonly Task notBeforeTask;
        public InternalReleaser(SemaphoreSlim semaphoreSlim, Task dependantTask)
        {
            semaphoreToRelease = semaphoreSlim;
            notBeforeTask = dependantTask;
        }
        public void Dispose()
        {
            notBeforeTask.ContinueWith(_ => semaphoreToRelease.Release());
        }
    }
}

示例用法:

private TimeGatedSemaphore requestThrottler = new TimeGatedSemaphore(3, TimeSpan.FromSeconds(1));
public async Task<T> MyRequestSenderHelper(string endpoint)
{
    using (await requestThrottler.WaitAsync())
        return await SendRequestToAPI(endpoint);        
}

0

这里有一个使用 秒表 的解决方案:

public class Throttler : IThrottler
{
    private readonly Stopwatch m_Stopwatch;
    private int m_NumberOfRequestsInLastSecond;
    private readonly int m_MaxNumberOfRequestsPerSecond;

    public Throttler(int max_number_of_requests_per_second)
    {
        m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second;
        m_Stopwatch = Stopwatch.StartNew();
    }


    public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
    {
        var elapsed = m_Stopwatch.Elapsed;

        if (elapsed > TimeSpan.FromSeconds(1))
        {
            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond)
        {
            TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed;

            await Task.Delay(time_to_wait);

            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        m_NumberOfRequestsInLastSecond++;

        return await task();
    }
}

以下是如何测试此代码的方法:

class Program
{
    static void Main(string[] args)
    {
        DoIt();

        Console.ReadLine();
    }

    static async Task DoIt()
    {
        Func<Task<int>> func = async () =>
        {
            await Task.Delay(100);
            return 1;
        };

        Throttler throttler = new Throttler(3);

        for (int i = 0; i < 10; i++)
        {
            var result = await throttler.Throttle(func);

            Console.WriteLine(DateTime.Now);
        }            
    }
}

1
抱歉,我在问题中忘记提及到多线程。已经编辑过了。 - STO
1
@Servy,当我回答这个问题时,并没有要求多个线程访问该对象。但是,如果您从单个线程调用Throttle 10次,则前三个将在第一秒中运行,第二个三个将在第二秒中运行...在发布答案之前,我实际测试了这一点。 - Yacoub Massad
@YacoubMassad 当浏览代码时,前三个任务不会进入任何if语句,它们将开始运行,然后接下来的七个任务都会进入if语句,被延迟一秒,然后它们将在7秒后同时运行。没有代码能够延迟第二个任务中的四个任务超过这个时间。 - Servy
@YacoubMassad 是的,这就是我在评论中所说的。你的测试看起来有效的原因是你实际上并没有一次性排队所有10个任务。你正在等待节流操作,这意味着你不会尝试启动第二个操作,直到第一个完成,你不会尝试启动第三个操作,直到第四个完成,以此类推。如果你只调用Throttle 10次,你的代码即使在单线程环境中也会完全崩溃,你最终会在第一秒执行三个操作,在接下来的七秒内执行其余的操作。 - Servy
这次编辑只是让你清楚地认识到你一直以来的假设是错误的。原始版本的问题并没有明确陈述前提条件然后再更改它,而只是澄清了一个未指明的点。 - Servy
显示剩余10条评论

0

你可以将其用作通用的

public TaskThrottle(int maxTasksToRunInParallel)
{
    _semaphore = new SemaphoreSlim(maxTasksToRunInParallel);
}

public void TaskThrottler<T>(IEnumerable<Task<T>> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class
{
    // Get Tasks as List
    var taskList = tasks as IList<Task<T>> ?? tasks.ToList();
    var postTasks = new List<Task<int>>();

    // When the first task completed, it will flag 
    taskList.ForEach(x =>
    {
        postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken));
    });

    taskList.ForEach(x =>
    {
        // Wait for open slot 
        _semaphore.Wait(timeoutInMilliseconds, cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        x.Start();
    });

    Task.WaitAll(taskList.ToArray(), cancellationToken);
}

-2

编辑:这个解决方案是可行的,但只有在可以在一个线程中处理所有请求时才使用它。否则,请使用被接受为答案的解决方案。

好的,多亏了.NET中管理单独线程任务队列的最佳方法

我的问题几乎是重复的,只是在执行之前添加了延迟,这实际上很简单。

这里的主要帮手是SemaphoreSlim类,它允许限制并行度。

因此,首先创建一个信号量:

// Semaphore allows run 1 thread concurrently.
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

最终版本的节流器看起来像这样

public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    await semaphore.WaitAsync();
    try
    {
        await delaySource.Delay();
        return await task();
    }
    finally
    {
        semaphore.Release();
    }
}

延迟源也非常简单:

private class TaskDelaySource
{
    private readonly int maxTasks;
    private readonly TimeSpan inInterval;
    private readonly Queue<long> ticks = new Queue<long>();

    public TaskDelaySource(int maxTasks, TimeSpan inInterval)
    {
        this.maxTasks = maxTasks;
        this.inInterval = inInterval;
    }

    public async Task Delay()
    {
        // We will measure time of last maxTasks tasks.
        while (ticks.Count > maxTasks)
            ticks.Dequeue();

        if (ticks.Any())
        {
            var now = DateTime.UtcNow.Ticks;
            var lastTick = ticks.First();
            // Calculate interval between last maxTasks task and current time
            var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick);

            if (intervalSinceLastTask < inInterval)
                await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds);
        }

        ticks.Enqueue(DateTime.UtcNow.Ticks);
    }
}

你的 Delay 方法不能安全地从多个线程调用。 - Servy
@Servy 是的,但它总是受到信号量的保护,最大并行度为1,不是吗? - STO
1
如果这样做,那么您永远不能拥有多个并发操作,因为第二个被限制的操作无法在第一个完成之前启动。 - Servy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接