异步任务的高效速率限制(每时间间隔N次执行)

4

当使用.NET 4.0/4.5任务时,我如何实现有效的速率限制算法(在时间间隔内执行N次特定操作)?例如,如果在一秒钟内发送了超过N条消息,则希望SendMessageAsync阻塞(而不发送另一条消息)。

        await Task.WhenAll(
                    Enumerable.Range(0, TOTAL_MESSAGES)
                    .Select(async x =>
                    {
                        await clientSession.SendMessageAsync(CreateMessage());
                    }));

我尝试在SendMessageAsync内部使用Task.Delay,但是由于等待Task.Delay会立即返回,所以下一条消息将被发送而没有任何阻塞。
public async Task<ResponseMessage> SendMessageAsync(RequestMessage message)
{

    int sleepTime;
    if (throttler.ShouldThrottle(out sleepTime))
    {
        await Task.Delay(sleepTime);
    }

    return await InternalMessageWithoutWaitingAsync(message);
}

我可以将await Task.Delay(sleepTime)更改为Thread.Sleep(sleepTime),然后等待再执行异步操作,但是我想知道在使用任务时这是否是一个好的实践。


只有当“限制器”发出信号“开始”时,我才会开始下一个任务。我认为这比启动所有任务并立即等待大部分任务要好得多。 - usr
2
基于时间的操作,如在一段时间间隔内进行限流,最好使用 Rx 来代替直接使用 async - Stephen Cleary
1
@RobertMircea 使用 Thread.Sleep 绝对是一种不好的做法(浪费资源)。我没有理解你关于 下一个消息将被发送而没有任何阻塞 的担忧。这是否意味着您需要按顺序执行所有任务?如果您的情况确实很复杂,可以考虑实现自己的 TaskScheduler如何:创建限制并发性的任务计划程序 - outcoldman
1
作为@StephenCleary提到的那样,问题是关于在时间间隔内进行限流而不是在某一时刻限制并行操作(并发)。 “下一个消息将被发送而不会阻塞”意味着下一个消息将在不尊重我尝试实现的1秒间隔内的速率限制的情况下发送。 - Robert Mircea
1
@RobertMircea 那么你可以实现自己的 TaskScheduler(如果你不想使用 Rx)。 - outcoldman
显示剩余2条评论
1个回答

2

async 代码中使用 Thread.Sleep 不是一个好习惯。

你可以尝试用以下方法实现相同的效果:

private SemaphoreSlim _semaphore = new SemaphoreSlim(N);
public async Task<ResponseMessage> SendMessageAsync(RequestMessage message)
{
  await _semaphore.WaitAsync();
  try
  {
    return await InternalMessageWithoutWaitingAsync(message);
  }
  finally
  {
    ReleaseSemaphoreAfterDelayAsync();
  }
}

private async Task ReleaseSemaphoreAfterDelayAsync()
{
  await Task.Delay(TimeInterval);
  _semaphore.Release();
}

我使用这段代码生成了400K条消息。问题在于,许多任务会被创建,消耗800MB的RAM,而只有N个正在处理消息,其余的被信号量阻塞。在等待处理完成时,如何停止创建新任务? - Robert Mircea
当我在ANTS Memory Profiler中运行此代码时,我看到有400k个任务占用了17,600,000字节。这可能是由于一些被提升的本地变量导致了内存压力;请参考Stephen Toub的资源herehere以获取调优思路。 - Stephen Cleary
谢谢建议。我没有重新考虑 Rx 的问题。我想保留的唯一范例是让调用方有机会使用 Task<ResponseMessage> 的美感。该库的设计如下:如果在30秒内未收到来自服务器的回复,任务将被取消;如果与服务器通信出现错误,则任务会捕获异常;或者任务会给调用方提供等待服务器回复的选项。生产者向库中“流式”传递消息(在生产过程中不限制事先已知的消息数量)。希望我可以在 Rx 中保持这个功能。 - Robert Mircea
现代版本的 Rx 与 async 的互操作性非常干净。例如,可以查看 ToTask 操作符。 - Stephen Cleary
1
@Prabhu:你可以使用await。请注意,VS2013不允许在finally(或catch)中使用await,这会使代码变得复杂。下一个版本的VS预计将允许在finally块中使用await - Stephen Cleary
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接