我认为你实际上不需要使用直接的TPL Tasks
。首先,我会建立一个BlockingCollection
,将其设置在默认的ConcurrentQueue
上,并且没有设置BoundedCapacity
来存储需要处理的ID。
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>()
从那里,我只需在从
BlockingCollection::GetConsumingEnumerable
返回的枚举上使用
Parallel::ForEach
。在
ForEach
调用中,您将设置您的
ParallelOptions::MaxDegreeOfParallelism
。在
ForEach
的正文中,您将执行您的存储过程。
现在,一旦存储过程执行完成,您不想重新安排执行至少两秒钟。没问题,使用回调函数安排一个
System.Threading.Timer
,该回调函数将简单地将 ID 添加回提供的
BlockingCollection
。
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4
},
(id) =>
{
Timer timer = null;
timer = new Timer(
_ =>
{
idsToProcess.Add(id);
timer.Dispose();
},
null,
2000,
Timeout.Infinite);
}
最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding
,以便正在处理的可枚举对象停止阻塞并完成,Parallel::ForEach
将退出。例如,如果这是一个 Windows 服务,则应该在 OnStop
中执行此操作。
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
更新
您在评论中提出了一个合理的担忧,即您可能会在任何给定时间处理大量的ID,并担心每个ID都会有太多的开销。我完全同意这一点。因此,在处理大量ID的情况下,我建议改用另一个队列来保存“休眠”ID,该队列由单个短间隔计时器监视,而不是使用以定时器为基础的方法。首先,您需要一个ConcurrentQueue
来放置休眠的ID:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>()
现在,我在这里使用一个由两部分组成的元组
Tuple
来举例说明,但您可能希望创建一个更加强类型的结构体(或者至少使用
using
语句别名)以提高可读性。该元组包含id和表示其进入队列时间的DateTime。
现在,您还需要设置监视此队列的计时器:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
idsToProcess.Enqueue(id);
}
},
null,
Timeout.Infinite,
100
);
然后,您只需将Parallel::ForEach
更改为执行以下操作,而不是为每个设置计时器:
(id) =>
{
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}