为什么 await async 如此缓慢?

21

我终于拥有了VS2012,并创建了一个简单的演示程序,以测试异步和等待带来的潜在性能提升,但令我失望的是它更慢了!可能我做错了什么,但也许你可以帮我解决问题。(我还添加了一个简单的多线程解决方案,它的运行速度比预期快)

我的代码使用一个类对数组进行求和,基于您系统上的核数(-1)。我的系统有4个核心,所以我通过多线程看到了约2倍的加速(2.5个线程),但在使用异步/等待时,相同的操作速度减慢了2倍。

代码:(请注意,您需要添加对System.Management的引用才能使核心检测器正常工作)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Management;
using System.Diagnostics;

namespace AsyncSum
{
    class Program
    {
        static string Results = "";

        static void Main(string[] args)
        {
            Task t = Run();
            t.Wait();

            Console.WriteLine(Results);
            Console.ReadKey();
        }

        static async Task Run()
        {
            Random random = new Random();

            int[] huge = new int[1000000];

            for (int i = 0; i < huge.Length; i++)
            {
                huge[i] = random.Next(2);
            }

            ArraySum summer = new ArraySum(huge);

            Stopwatch sw = new Stopwatch();

            sw.Restart();
            long tSum = summer.Sum();
            for (int i = 0; i < 100; i++)
            {
                tSum = summer.Sum();
            }
            long tticks = sw.ElapsedTicks / 100;

            long aSum = await summer.SumAsync();
            sw.Restart();
            for (int i = 0; i < 100; i++)
            {
                aSum = await summer.SumAsync();
            }
            long aticks = sw.ElapsedTicks / 100;

            long dSum = summer.SumThreaded();
            sw.Restart();
            for (int i = 0; i < 100; i++)
            {
                dSum = summer.SumThreaded();
            }
            long dticks = sw.ElapsedTicks / 100;


            long pSum = summer.SumParallel();
            sw.Restart();
            for (int i = 0; i < 100; i++)
            {
                pSum = summer.SumParallel();
            }
            long pticks = sw.ElapsedTicks / 100;

            Program.Results += String.Format("Regular Sum: {0} in {1} ticks\n", tSum, tticks);
            Program.Results += String.Format("Async Sum: {0} in {1} ticks\n", aSum, aticks);
            Program.Results += String.Format("Threaded Sum: {0} in {1} ticks\n", dSum, dticks);
            Program.Results += String.Format("Parallel Sum: {0} in {1} ticks\n", pSum, pticks);
        }
    }

    class ArraySum
    {
        int[] Data;
        int ChunkSize = 1000;
        int cores = 1;


        public ArraySum(int[] data)
        {
            Data = data;

            cores = 0;
            foreach (var item in new System.Management.ManagementObjectSearcher("Select * from Win32_Processor").Get())
            {
                cores += int.Parse(item["NumberOfCores"].ToString());
            }
            cores--;
            if (cores < 1) cores = 1;

            ChunkSize = Data.Length / cores + 1;
        }

        public long Sum()
        {
            long sum = 0;
            for (int i = 0; i < Data.Length; i++)
            {
                sum += Data[i];
            }
            return sum;
        }

        public async Task<long> SumAsync()
        {
            Task<long>[] psums = new Task<long>[cores];
            for (int i = 0; i < psums.Length; i++)
            {
                int start = i * ChunkSize;
                int end = start + ChunkSize;

                psums[i] = Task.Run<long>(() =>
                {
                    long asum = 0;
                    for (int a = start; a < end && a < Data.Length; a++)
                    {
                        asum += Data[a];
                    }
                    return asum;
                });
            }

            long sum = 0;
            for (int i = 0; i < psums.Length; i++)
            {
                sum += await psums[i];
            }

            return sum;
        }

        public long SumThreaded()
        {
            long sum = 0;
            Thread[] threads = new Thread[cores];
            long[] buckets = new long[cores];
            for (int i = 0; i < cores; i++)
            {
                int start = i * ChunkSize;
                int end = start + ChunkSize;
                int bucket = i;
                threads[i] = new Thread(new ThreadStart(() =>
                {
                    long asum = 0;
                    for (int a = start; a < end && a < Data.Length; a++)
                    {
                        asum += Data[a];
                    }
                    buckets[bucket] = asum;
                }));
                threads[i].Start();
            }

            for (int i = 0; i < cores; i++)
            {
                threads[i].Join();
                sum += buckets[i];
            }

            return sum;
        }

        public long SumParallel()
        {
            long sum = 0;
            long[] buckets = new long[cores];
            ParallelLoopResult lr = Parallel.For(0, cores, new Action<int>((i) =>
            {
                int start = i * ChunkSize;
                int end = start + ChunkSize;
                int bucket = i;
                long asum = 0;
                for (int a = start; a < end && a < Data.Length; a++)
                {
                    asum += Data[a];
                }
                buckets[bucket] = asum;
            }));

            for (int i = 0; i < cores; i++)
            {
                sum += buckets[i];
            }

            return sum;
        }
    }
}

有什么想法吗?我使用async/await的方式不对吗?如果有建议,我很乐意尝试。


1
首先,你的多线程示例实际上不会起作用。你没有同步访问聚合总和变量,那里存在竞态条件。 - Servy
7
我可能错了,但我认为它实际上并没有使用多核。如果我理解C#异步编程的文章所说的内容,它只是在每个任务周围创建状态机,而没有真正实现任何并行处理。它只是在完成时包装事件处理程序,因此一个核心仍在执行所有工作,只是额外增加了一些开销。你需要使用并行库才能看到差异。但是我可能对所有这些还不太了解。 - Joe Enos
3
Task.Run的调用会将任务安排到线程池中执行。 - Eric Lippert
3
你应该使用 Stopwatch 来计时,而不是 DateTime - Eric Lippert
2
仔细观察您的程序:它并不是在比较“async”和“Join”;而是在衡量将任务调度到线程池仅创建一堆线程之间的差异。“await”与此无关。 - Eric Lippert
显示剩余10条评论
4个回答

29

将"异步"和"并行化"分开是很重要的。 await 用于帮助编写异步代码。 在并行运行的代码中可能(或可能不会)涉及到异步,而异步的代码可能(或可能不会)并行运行。

await 的设计并不旨在使并行代码更快。 await 的目的是使编写异步代码更加容易,同时最小化负面的性能影响。使用await 不会比编写正确的非 await 异步代码更快(尽管由于使用 await 编写正确的代码更容易,因此有时会更快,因为程序员没有能力在没有 await 的情况下正确编写该异步代码,或者不愿意花时间这样做。如果非异步代码编写得好,它的性能将与 await 代码相当甚至略优。

C# 确实专门支持并行处理,但不是通过 await 实现的。任务并行库(TPL)以及并行 LINQ(PLINQ)具有几种非常有效的方法来并行处理代码,通常比朴素的线程实现更高效。

在您的情况下,使用 PLINQ 的有效实现可能如下所示:

public static int Sum(int[] array)
{
    return array.AsParallel().Sum();
}

请注意,这将有效地将输入序列划分为可以并行运行的块;它将负责确定块的适当大小和并发工作者的数量,并适当地聚合这些工作者的结果,以确保正确的结果(与您的线程示例不同)并且高效(这意味着它不会完全串行化所有聚合)。


@ohmusama,不重蹈覆辙会有许多好处,+1,除非你对这个数据有特殊的了解,否则与你所能想到的任何东西一样快。这很容易打字,并且易于正确操作。如果数据根据已知函数分布,您可能能够绕过此操作,但这里不适用。 - Jodrell
我喜欢可以进行并行求和,但是我想知道完整的做法。所以我添加了一个并行测试,这是一个好的例子吗? - ohmusama
我认为,考虑到我们对数据的生成了解,将答案近似为“1000000”可能是最优的。 - Jodrell
这个解决方案比单线程的慢!不确定为什么;也许 AsParallel() 强制访问数组通过 IEnumerator,增加了显著的开销? - Daniel
@Daniel 首先,我会质疑你的基准测试方法,因为基准测试很难,人们经常做不好。其次,设置所有这些需要一定的开销,如果用于执行生产工作的时间足够短,则开销可能比并行化带来的收益更高;增加更多的工作或更昂贵的工作将有助于在基准测试中缓解这种开销。 - Servy
@Servy 好的,我重新运行了相同的测试,Daniel是正确的,减速实际上是由于在运行时计算lambda表达式引起的。再次运行它,结果比线性方法更快(即使使用async/await)。 - ohmusama

13

async并不适合进行大规模的并行计算。你可以使用Task.RunTask.WhenAll完成基本的并行任务,但任何严肃的并行工作都应该使用任务并行库(例如Parallel)来完成。客户端异步代码涉及“响应性”,而不是“并行处理”。

常见的做法是使用Parallel进行并行处理,然后将其包装在Task.Run中,并在其上使用await以保持UI的响应性。


@ohmusama 不是很好。Parallel.For在并行聚合工作方面并不特别出色,因此使用PLINQ会更好,就像我的示例一样。您的循环与其他示例没有太大区别,而非聚合函数中每个集合项都独立处理的情况下,Parallel.For能够比您更好地提供数据并动态确定适当的工作线程数,这将帮助您看到明显的改进。 - Servy
“Parallel.For”可以聚合,但语法很笨拙。对于简单的求和示例,PLINQ更好。 - Stephen Cleary
1
@StephenCleary 对我来说,它看起来并不像是需要大量计算的工作。 - Marc2001

10

你的基准测试存在一些缺陷:

  • 你正在计时第一次运行,其中包括初始化时间(加载class Task,JIT编译等)
  • 你正在使用DateTime.Now,该方法对毫秒级别的计时太不准确了。你需要使用StopWatch

解决这两个问题后,我获得了以下基准测试结果:

Regular Sum:  499946 in 00:00:00.0047378
Async Sum:    499946 in 00:00:00.0016994
Threaded Sum: 499946 in 00:00:00.0026898

现在异步方式成为最快的解决方案,只需要不到2毫秒。

接下来出现的问题是:计时短至2毫秒非常不可靠,如果其他进程在后台使用CPU,您的线程可能会被暂停更长时间。您应该在数千次基准测试运行中对结果进行平均。

此外,你的核心检测出了什么问题?我的四核正在使用333334的块大小,只允许3个线程运行。


7
基准测试存在的问题不仅仅是这些。你需要更多地做一些工作,才能使结果在统计意义上有意义。首先应该调用运行方法数千次,并对结果进行平均值;在测时之间应运行GC集合;应进行热身运行以确保所有内容都被JIT编译,当然还有其他很多事情。话虽如此,问题本身存在缺陷(正如我的答案所述),而不仅仅是基准测试。 - Servy
1
我发布的结果在多次运行中非常稳定;但你说得对,你应该进行成千上万次的运行,这样你就不会冒着上下文切换彻底破坏你的结果的风险;我已经编辑了我的答案。 - Daniel
@Daniel,你在我的评论中实际上已经给出了正确的答案,如果你可以将它添加到你的答案中,我会给你点赞。事实上,那个lambda表达式必须在运行时编译,因此第一次运行速度非常慢,但第二次运行比线性求和更快。谢谢你找到了这个问题! - ohmusama
@ohmusama:这就是我在第一点(初始化时间)中所指的。这与使用lambda表达式无关(lambda的初始化并不比普通方法更昂贵),只是因为需要加载更多的代码来使整个async/await机制工作。 - Daniel

6

快速查看,结果是可以预料的:您的异步求和只使用一个线程,而您异步等待它完成,因此比多线程的求和要慢。

如果您在其执行任务时有其他事情需要完成,那么您将使用异步。因此,这不是任何速度/响应改进的正确测试。


+1。在“async”/“await”情况下是单线程的,这就解释了为什么会变慢——需要运行比基本的“Sum”调用更多的代码来支持所有的“async”基础设施。 - Alexei Levenkov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接