在继续任务上使用Task.WaitAll()只会延迟原始任务的执行?

5

背景:

我有一个控制台应用程序,它创建了用于从数据库处理数据的“任务”(称为Level1任务)。每个任务都会再次创建自己的任务来处理分配给它的数据的每个部分(Level2任务)。

每个Level2任务都有一个继续任务与之关联,并使用代码对继续任务进行 WaitAll

我使用的是.NET 4.0(没有async/await)。

问题:

但是这样做会产生问题-结果发现,在计划所有可用的Level1任务之前,没有一个Level2任务被启动。这在任何情况下都不是最优的。

问题:

改变代码以等待原始的Level2任务及其继续任务似乎可以解决此问题。然而,我并不完全确定为什么会这样。

您有任何想法吗?

我唯一能想到的是-由于继续任务尚未开始,因此等待其完成毫无意义。但即使是这种情况,我也希望至少有一些Level2任务已经开始了。但实际上,它们从未开始。

示例:

我创建了一个演示该行为的示例控制台应用程序:

  1. 按原样运行它,您将看到它首先计划了所有任务,然后才开始从Level2任务中实际写入线路。

  2. 但是,请注释掉标记的代码块并取消注释替换,然后一切都可以按预期工作。

您能告诉我原因吗?

public class Program
{
    static void Main(string[] args)
    {
        for (var i = 0; i < 100; i++)
        {
            Task.Factory.StartNew(() => SomeMethod());
            //Thread.Sleep(1000);
        }

        Console.ReadLine();
    }

    private static void SomeMethod()
    {
        var numbers = new List<int>();

        for (var i = 0; i < 10; i++)
        {
            numbers.Add(i);
        }

        var tasks = new List<Task>();

        foreach (var number in numbers)
        {
            Console.WriteLine("Before start task");

            var numberSafe = number;

            /* Code to be replaced START */

            var nextTask = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Got number: {0}", numberSafe);
            })
                .ContinueWith(task =>
                {
                    Console.WriteLine("Continuation {0}", task.Id);
                });

            tasks.Add(nextTask);

            /* Code to be replaced END */

            /* Replacement START */

            //var originalTask = Task.Factory.StartNew(() =>
            //{
            //    Console.WriteLine("Got number: {0}", numberSafe);
            //});

            //var contTask = originalTask
            //    .ContinueWith(task =>
            //    {
            //        Console.WriteLine("Continuation {0}", task.Id);
            //    });

            //tasks.Add(originalTask);
            //tasks.Add(contTask);

            /* Replacement END */
        }

        Task.WaitAll(tasks.ToArray());
    }
}

дҪ еҝ…йЎ»дҪҝз”ЁContinueWithеҗ—пјҹиҝҳжҳҜеҸҜд»ҘдҪҝз”Ёasync/awaitпјҹ - noseratio - open to work
@Noseratio - 这是 .NET 4.0 版本 - 没有异步/等待。 - Joanna Derks
1
如果您使用VS2012+,则仍然可以在.NET 4.0中使用它:http://stackoverflow.com/tags/async-await/info - noseratio - open to work
那么你的代码确实需要完全重构,这个链接可能会有所帮助:http://blogs.msdn.com/b/pfxteam/archive/2010/11/21/10094564.aspx - noseratio - open to work
我的观点是要为ContinueWith使用正确的任务组合和迭代模式。如果时间允许,我会将其发布为答案。 - noseratio - open to work
显示剩余4条评论
4个回答

4
我认为你正在看到的是“任务内联”行为。引用自MSDN

在某些情况下,当等待任务时,它可能会在执行等待操作的线程上同步执行。这提高了性能,因为它利用了现有的线程,避免了需要额外的线程。为了防止由于重入而导致的错误,只有在相关线程的本地队列中找到等待目标时,才会发生任务内联。

你不需要100个任务来看到这个行为。我已经修改了你的程序,使其具有4个一级任务(我有四核CPU)。每个一级任务仅创建一个二级任务。
static void Main(string[] args)
{
    for (var i = 0; i < 4; i++)
    {
        int j = i;
        Task.Factory.StartNew(() => SomeMethod(j)); // j as level number
    }
}

在您的原始程序中,nextTask是继续执行的任务 - 所以我只是简化了这个方法。
private static void SomeMethod(int num)
{
    var numbers = new List<int>();

    // create only one level 2 task for representation purpose
    for (var i = 0; i < 1; i++)
    {
        numbers.Add(i);
    }

    var tasks = new List<Task>();

    foreach (var number in numbers)
    {
        Console.WriteLine("Before start task: {0} - thread {1}", num, 
                              Thread.CurrentThread.ManagedThreadId);

        var numberSafe = number;

        var originalTask = Task.Factory.StartNew(() =>
        {
            Console.WriteLine("Got number: {0} - thread {1}", num, 
                                    Thread.CurrentThread.ManagedThreadId);
        });

        var contTask = originalTask
            .ContinueWith(task =>
            {
                Console.WriteLine("Continuation {0} - thread {1}", num, 
                                    Thread.CurrentThread.ManagedThreadId);
            });

        tasks.Add(originalTask); // comment and un-comment this line to see change in behavior

        tasks.Add(contTask); // same as adding nextTask in your original prog.

    }

    Task.WaitAll(tasks.ToArray());
}

这是一个样例输出 - 当你注释掉 tasks.Add(originalTask); 这一行代码时 - 这是你的第一段代码块。

Before start task: 0 - thread 4
Before start task: 2 - thread 3
Before start task: 3 - thread 6
Before start task: 1 - thread 5
Got number: 0 - thread 7
Continuation 0 - thread 7
Got number: 1 - thread 7
Continuation 1 - thread 7
Got number: 3 - thread 7
Continuation 3 - thread 7
Got number: 2 - thread 4
Continuation 2 - thread 4

以下是一些示例输出 - 在保留 tasks.Add(originalTask); 的情况下,这是您的第二个块

Before start task: 0 - thread 4
Before start task: 1 - thread 6
Before start task: 2 - thread 5
Got number: 0 - thread 4
Before start task: 3 - thread 3
Got number: 3 - thread 3
Got number: 1 - thread 6
Got number: 2 - thread 5
Continuation 0 - thread 7
Continuation 1 - thread 7
Continuation 3 - thread 7
Continuation 2 - thread 4

从第二个案例中可以看到,当你在与启动任务相同的线程上等待originalTask时,任务内联将使其在相同的线程上运行 - 这就是为什么您会更早地看到得到数字..消息的原因。


有趣 - 我会了解更多并在此基础上进行更多的测试。 - Joanna Derks
我发现所有的答案都非常有帮助 - 既然你是第一个解释并且提供了有用的例子,我会将你的答案标记为被接受的。谢谢! - Joanna Derks
@JoannaTurban:很高兴能帮到您。如果您正在考虑其他替代方案,建议您查看一下TPL.Dataflow和其中的BufferBlock<>。它是一个异步/非阻塞的生产者-消费者数据结构,您可以使用async/await与之配合使用。 - YK1

2
你的代码问题在于 阻塞 Task.WaitAll(tasks.ToArray())。默认的 TPL 任务调度器不会为你使用 Factory.StartNew 启动的每个任务都使用一个新的池线程。而你启动了 100 个 Level1 任务,每个任务都会使用 Task.WaitAll 阻塞一个线程。
这造成了瓶颈。使用默认大小的 ThreadPool,我同时运行了 ~20 个线程,但只有其中 4 个实际上在同时执行(CPU 核心数)。因此,一些任务将只被排队,并在较早的任务完成时稍后启动。要了解我的意思,请尝试像这样更改你的代码:
static void Main(string[] args)
{
    for (var i = 0; i < 100; i++)
    {
        Task.Factory.StartNew(() => SomeMethod(), 
            TaskCreationOptions.LongRunning);
    }

    Console.ReadLine();
}

“TaskCreationOptions.LongRunning”可以给您所需的行为,但这当然是一种错误的解决方案。
正确的解决方案是尽可能避免阻塞代码。如果必须要阻塞等待,则应该只在最高层级上执行。
为了解决这个问题,您的代码可以像下面这样重新设计。请注意使用“ContinueWhenAll”、“Unwrap”和(可选)“ExecuteSynchronously”,它有助于消除阻塞代码并减少涉及的线程池线程数量。这个版本性能更好。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class Program
{
    static void Main(string[] args)
    {
        var tasks = new List<Task>();

        for (var i = 0; i < 100; i++)
        {
            tasks.Add(Task.Factory.StartNew(() => SomeMethod(i)).Unwrap());
        }

        // blocking at the topmost level
        Task.WaitAll(tasks.ToArray());

        Console.WriteLine("Enter to exit...");
        Console.ReadLine();
    }

    private static Task<Task[]> SomeMethod(int n)
    {
        Console.WriteLine("SomeMethod " + n);

        var numbers = new List<int>();

        for (var i = 0; i < 10; i++)
        {
            numbers.Add(i);
        }

        var tasks = new List<Task>();

        foreach (var number in numbers)
        {
            Console.WriteLine("Before start task " + number);

            var numberSafe = number;

            var nextTask = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Got number: {0}", numberSafe);
            })
            .ContinueWith(task =>
            {
                Console.WriteLine("Continuation {0}", task.Id);
            }, TaskContinuationOptions.ExecuteSynchronously);

            tasks.Add(nextTask);
        }

        return Task.Factory.ContinueWhenAll(tasks.ToArray(), 
            result => result, TaskContinuationOptions.ExecuteSynchronously);
    }
}

理想情况下,在实际项目中,应尽可能使用自然异步的API(例如,"在.Net 4.5中使用SqlDataReader的新异步方法"),并仅将Task.Run / Task.Factory.StartNew用于CPU密集型计算任务。对于服务器端应用程序(例如ASP.NET Web API),Task.Run / Task.Factory.StartNew通常只会增加多余的线程切换开销。除非您真的需要同时执行多个CPU密集型作业以损害可扩展性,否则它不会加快HTTP请求的完成速度。
我知道下面的选项可能不可行,但我强烈建议升级到VS2012+并使用async/await来实现类似这样的逻辑。这将非常值得投资,因为它极大地加快了编码过程,并产生更简单、更清晰和更少出错的代码。你仍然可以使用Microsoft.Bcl.Async来针对.NET 4.0。

1
谢谢您的解释 - 这是一个有趣的替代方案,可能是解决手头问题更可靠的解决方案。 - Joanna Derks

1
如果我没记错的话,等待一个尚未安排的任务可能会同步执行它。(请参见这里) 如果在另一种情况下,你的代码也可能出现这种行为,这并不奇怪。
请记住,线程行为高度依赖于实现和机器,这里发生的事情可能是这样的:
  • 由于调用Task.StartNew和任务实际在线程池中执行之间存在延迟,大多数所谓的“Level 1”任务,如果不是全部,都会在第一个任务实际执行之前被调度。
  • 由于默认任务调度程序使用.NET ThreadPool,因此在此处安排的所有任务都可能在ThreadPool线程上执行。
  • 一旦执行“Level 1”任务,调度队列将填充所有“Level 1”任务。
  • 每次执行“Level 1”任务时,它会安排所需数量的“Level 2”任务,但这些任务都是在“Level 1”任务之后安排的。
  • 当“Level 1”任务到达等待“Level 2”任务的所有继续的点时,执行线程进入等待状态。
  • 有许多ThreadPool线程处于等待状态时,程序迅速达到ThreadPool饥饿状态,强制ThreadPool分配新线程(可能超过100个)以解决饥饿问题
  • 一旦最后一个“Level 1”任务到达等待状态,则ThreadPool至少分配一个额外的线程。
  • 这个最后分配的额外线程现在可以执行“Level 2”任务及其继续,因为所有“Level 1”任务都已完成。
  • 经过一段时间,“Level 1”任务将拥有所有“Level 2”任务。此“Level 1”任务将从其等待中唤醒并完成其执行,从而释放另一个ThreadPool线程,并加速剩余“Level 2”任务和继续执行。
当您使用替代方法时,发生的变化是因为您直接在等待任务的数组中引用了“Level 2”任务,Task.WaitAll方法有机会同步执行“Level 2”任务,而不是空闲等待。这在最初的情况下是不可能发生的,因为继续任务不能同步运行。
总之,在ThreadPool线程中等待是导致线程饥饿和观察到的奇怪行为的原因。虽然在等待任务的代码中进行的优化使线程饥饿行为消失了,但显然您不应该依赖它。
要解决您的初始问题,最好遵循lil-raz的建议,放弃内部任务。
如果您可以访问C# 5.0,则还可以考虑使用异步/等待模式编写代码,而不依赖于等待。

0
我必须说,这段代码真的不太乐观。你创建了100个任务,并不意味着你会有100个线程。在每个任务内部,你又创建了两个新任务,这样会超出调度程序的限制。如果这些任务与数据库读取有关,为什么不将它们标记为长时间处理并丢弃内部任务呢?

这只是一个示例,用于演示行为。真正的应用程序正在进行数据库调用,但这并不是问题的一部分。它们不应以任何方式长时间运行。问题是 - 为什么代码的更改会以这种方式影响处理。 - Joanna Derks

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接