Thread.Sleep 阻塞并行任务的执行

4

我正在调用一个工作方法,该方法调用数据库,然后迭代并返回值以进行并行处理。为了防止它对数据库造成负担,我在其中使用了Thread.Sleep来暂停对数据库的执行。然而,这似乎会阻塞仍在进行的Parallel.ForEach执行。如何最好地实现这一点以防止阻塞?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

编辑: 我修改了代码以包含答案,但它仍未按照我的期望工作。我在GetWorkItems()调用中添加了.AsParallel().WithDegreeOfParallelism(10)。当基础线程处于休眠状态时,我认为并行应该继续执行,这个想法是错误的吗?

例子: 我有15个项目,它遍历并获取10个项目并开始处理。每当一个项目完成时,它会从GetWorkItems请求另一个项目,直到尝试请求第16个项目。此时,它应该停止尝试获取更多项目,但应继续处理11-15号项目,直到这些项目完成。并行应该是这样工作的吗?因为它目前并没有这样做。目前正在完成6个项目时,它会锁定在Parallel.ForEach中仍在运行的后续10个项目。


3
“Thread.Sleep” 几乎从来都不是一个好的解决方案。您能否进一步解释一下您想要实现什么?“WorkItemRepository.GetItemList” 是用来做什么的? - Jim Mischel
2
真正的程序不会 Sleep()。这样会浪费一个线程并且让任务调度器感到沮丧。 - H H
@Jim Mischel:我非常确定WorkItemRepository.GetItemList会执行类似于SELECT * FROM workqueue WHERE status = 'unprocessed'的操作,并将其编译成可排队的工作项类型。 - Bryan Boettcher
@JimMischel WorkItemRepository.GetItemList查询数据库,然后构建一个IList <WorkItems>。我已经在原始帖子中更新了我的编辑和示例。 - Conway Stern
4个回答

8
我建议您创建一个工作项的BlockingCollection(队列),并设置一个定时器,每30秒调用数据库来填充它。类似这样:
BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

在初始化时:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

每30秒钟,它将查询数据库中的项目。

对于调度要处理的工作项,有许多不同的解决方案。最接近您所拥有的解决方案是这个:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

那可以消除任何线程的阻塞,并让TPL决定如何最好地分配并行任务。
编辑:
实际上,更接近您所拥有的是:
foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

你也许可以使用以下方法:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

我不确定这个方法是否可行,也不知道效果如何,不过值得一试...
总体上,我的建议是将其视为生产者/消费者应用程序,其中生产者是定期查询新项目的线程。我的示例每隔N(此处为30)秒查询一次数据库,如果平均每30秒就可以清空工作队列,则效果很好。这将使从将项目发布到数据库直到你获得结果的平均延迟少于一分钟。
你可以减少轮询频率(从而降低延迟),但这会导致更多的数据库流量。
你也可以做得更复杂些。例如,如果在30秒后查询数据库并获得大量项目,则很可能会很快再次获得更多项目,因此您需要在15秒钟(或更短时间)内再次查询数据库。相反,如果在30秒后查询数据库并没有获得任何东西,则你可以等待更长时间再次查询。
你可以使用一次性计时器来设置这种自适应轮询。也就是说,在创建计时器时,当最后一个参数设为-1时,它将只触发一次。你的计时器回调函数将计算出下一次轮询前要等待多长时间,并调用Timer.Change以使用新值初始化计时器。

我实现了您建议的编辑。但是,计时器似乎没有触发多次。我在构造函数中添加了上面的代码片段和其他几个地方,希望能让它多次触发。System.Threading.Timer WorkItemTimer有什么想法可以让计时器工作吗?虽然BlockingCollection只有一次旅行,但它似乎正在工作。感谢您的帮助。 - Conway Stern
@ConwayStern:我无法想象计时器为什么会多次失败。在类范围内声明“WorkItemTimer”,然后在构造函数中初始化它。如果你在构造函数中声明了它,那么可能会被垃圾回收。 - Jim Mischel
这篇文章帮助我获得了更多搜索的想法。最终我使用了这个示例 使用BlockingCollection实现消费者/生产者模型 作为基础。谢谢。 - Conway Stern

3

那会对这里有什么帮助呢?它设置了线程数量的最大值,而OP似乎需要一个最小值。 - H H
1
@Henk Holterman,它设置的是最大处理器数量,而不是线程。在双核机器上,这将是两个处理器。由于Thread.Sleep调用不需要CPU时间,Microsoft建议“在查询执行大量非计算绑定工作(如文件I/O)的情况下,指定大于机器上核心数的并行度可能会更有益。” - scottm

2

你可能会遇到分区器的问题。

因为你正在传递一个IEnumerable,Parallel.ForEach将使用一个块分配器,它可以尝试从枚举中一次性获取几个元素。但是你的IEnumerable.MoveNext可能会睡眠,这会使事情变得混乱。

你可以编写自己的分区器,每次返回一个元素,但无论如何,我认为像Jim Mischel的建议那样采用生产者/消费者方法会更好。


0
你想通过睡眠来实现什么目的?从我所了解的情况来看,你试图避免频繁地调用数据库。我不知道有更好的方法可以做到这一点,但理想情况下,你的GetItemList调用应该是阻塞的,直到有数据可供处理为止。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接