TPL架构问题

6
我目前正在处理一个项目,我们需要并行处理项目。这不是什么大问题 ;) 现在问题来了。我们有一个ID列表,每2秒钟我们要为每个ID调用一个StoredProcedure。 需要针对每个项目单独检查2秒钟,因为它们在运行时添加和删除。 此外,我们想配置最大并行度,因为数据库不应同时涌入300个线程。 正在处理的项目在完成前不应重新安排处理。原因是我们想防止在数据库出现延迟的情况下排队很多项目。
现在我们正在使用自开发组件,该组件具有主线程,定期检查哪些项目需要安排处理。一旦它获得列表,就会将其放在自定义IOCP线程池上,然后使用waithandles等待项目被处理。然后开始下一个迭代。使用IOCP是因为它提供了工作窃取。
我想用TPL/.NET 4版本替换这个自定义实现,并想知道您如何解决它(最理想的是简单易读/易维护)。 我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只限制了使用的线程数。离开了工作窃取,周期性执行项目....
理想情况下,它将成为一个通用组件,可用于定期处理项目列表的所有任务。
欢迎任何输入, tia Martin
2个回答

9

我认为你实际上不需要使用直接的TPL Tasks。首先,我会建立一个BlockingCollection,将其设置在默认的ConcurrentQueue上,并且没有设置BoundedCapacity来存储需要处理的ID。

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

从那里,我只需在从 BlockingCollection::GetConsumingEnumerable 返回的枚举上使用 Parallel::ForEach。在 ForEach 调用中,您将设置您的 ParallelOptions::MaxDegreeOfParallelism。在 ForEach 的正文中,您将执行您的存储过程。
现在,一旦存储过程执行完成,您不想重新安排执行至少两秒钟。没问题,使用回调函数安排一个 System.Threading.Timer,该回调函数将简单地将 ID 添加回提供的 BlockingCollection
Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(),
    new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4 // read this from config
    },
    (id) =>
    {
       // ... execute sproc ...

       // Need to declare/assign this before the delegate so that we can dispose of it inside 
       Timer timer = null;

       timer = new Timer(
           _ =>
           {
               // Add the id back to the collection so it will be processed again
               idsToProcess.Add(id);

               // Cleanup the timer
               timer.Dispose();
           },
           null, // no state, id wee need is "captured" in the anonymous delegate
           2000, // probably should read this from config
           Timeout.Infinite);
    }

最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding,以便正在处理的可枚举对象停止阻塞并完成,Parallel::ForEach 将退出。例如,如果这是一个 Windows 服务,则应该在 OnStop 中执行此操作。

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

更新

您在评论中提出了一个合理的担忧,即您可能会在任何给定时间处理大量的ID,并担心每个ID都会有太多的开销。我完全同意这一点。因此,在处理大量ID的情况下,我建议改用另一个队列来保存“休眠”ID,该队列由单个短间隔计时器监视,而不是使用以定时器为基础的方法。首先,您需要一个ConcurrentQueue来放置休眠的ID:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();

现在,我在这里使用一个由两部分组成的元组Tuple来举例说明,但您可能希望创建一个更加强类型的结构体(或者至少使用using语句别名)以提高可读性。该元组包含id和表示其进入队列时间的DateTime。
现在,您还需要设置监视此队列的计时器:
Timer wakeSleepingIdsTimer = new Timer(
   _ =>
   {
       DateTime utcNow = DateTime.UtcNow;

       // Pull all items from the sleeping queue that have been there for at least 2 seconds
       foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
       {
           // Add this id back to the processing queue
           idsToProcess.Enqueue(id);
       }
   },
   null, // no state
   Timeout.Infinite, // no due time
   100 // wake up every 100ms, probably should read this from config
 );

然后,您只需将Parallel::ForEach更改为执行以下操作,而不是为每个设置计时器:

(id) =>
{
       // ... execute sproc ...

       sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
}

不错的想法,但你觉得这会不会造成一些资源问题呢?我的意思是,如果列表中有500个元素,我有点担心会有大量的计时器在运行... - Martin Moser
曾考虑过这一点,但由于您没有明确给出范围,因此我一直在等待是否符合您的需求的回复。您可以轻松通过另一个队列和单个计时器来解决这个问题,它将监控队列中到期的条目,并将它们移回主工作队列。我的答案中将添加详细信息。 - Drew Marsh

1

这与您在问题中提到的方法非常相似,但是使用了TPL任务。任务完成后,它会将自己添加回要调度的事物列表中。

在此示例中,在普通列表上使用锁定相当丑陋,可能需要更好的集合来保存要调度的事物列表。

// Fill the idsToSchedule
for (int id = 0; id < 5; id++)
{
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id));
}

// LongRunning will tell TPL to create a new thread to run this on
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning);

这将启动SchedulingLoop,它实际上执行检查是否已经过去两秒钟自某个操作以来。

// Tuple of the last time an id was processed and the id of the thing to schedule
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>();
static int currentlyProcessing = 0;
const int ProcessingLimit = 3;

// An event loop that performs the scheduling
public static void SchedulingLoop()
{
    while (true)
    {
        lock (idsToSchedule)
        {
            DateTime currentTime = DateTime.Now;
            for (int index = idsToSchedule.Count - 1; index >= 0; index--)
            {
                var scheduleItem = idsToSchedule[index];
                var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds;

                // start it executing in a background task
                if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit)
                {
                    Interlocked.Increment(ref currentlyProcessing);

                    Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun);

                    // Schedule this task to be processed
                    Task.Factory.StartNew(() =>
                        {
                            Console.WriteLine("Executing {0}", scheduleItem.Item2);

                            // simulate the time taken to call this procedure
                            Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500);

                            lock (idsToSchedule)
                            {
                                idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2));
                            }

                            Console.WriteLine("Done Executing {0}", scheduleItem.Item2);
                            Interlocked.Decrement(ref currentlyProcessing);
                        });

                    // remove this from the list of things to schedule
                    idsToSchedule.RemoveAt(index);
                }
            }
        }

        Thread.Sleep(100);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接