为什么这个TAP async/await代码比TPL版本慢?

18

我需要编写一个控制台应用程序,调用Microsoft Dynamics CRM Web服务来对8,000多个CRM对象执行操作。Web服务调用的详细信息与此处无关,但我需要一个多线程客户端,以便可以并行进行调用。我希望能够从配置设置中控制使用的线程数,并且还希望应用程序在服务错误达到配置定义的阈值时能够取消整个操作。

我使用Task Parallel Library Task.Run和ContinueWith编写了代码,跟踪了正在进行的调用(线程)数量、接收到的错误数量以及用户是否从键盘上取消。一切都很顺利,我有广泛的日志记录,以确保线程干净地完成,并且在运行结束时一切都是整洁的。我可以看到程序在并行使用最大数量的线程,并且如果达到我们的最大限制,则等待正在运行的任务完成后再开始另一个任务。

在我的代码审查过程中,我的同事建议使用async/await而不是tasks和continuations,因此我创建了一个分支并以这种方式重写了它。结果非常有趣-async/await版本几乎慢了两倍,而且从未达到允许的并行操作/线程的最大数量。TPL版本总是达到10个并行线程,而async/await版本从未超过5个。

我的问题是:我在编写async/await代码(或TPL代码)时犯了错误吗?如果我没有编写错误的代码,你能解释为什么async/await不那么高效吗?这是否意味着继续使用TPL进行多线程编码更好?

请注意,我测试的代码实际上没有调用CRM-CrmClient类只是按配置中指定的持续时间(五秒钟)休眠一段时间,然后抛出异常。这意味着没有外部变量会影响性能。

为了回答这个问题,我创建了一个简化版程序,将两个版本结合在一起;哪一个被调用取决于配置设置。它们都从引导运行程序开始,设置环境、创建队列类,然后使用TaskCompletionSource等待完成。CancellationTokenSource用于向用户发出取消信号。要处理的ID列表从嵌入文件中读取并推送到ConcurrentQueue中。它们都从StartCrmRequest开始调用多达最大线程数次;随后,每次处理结果时,ProcessResult方法再次调用StartCrmRequest,保持一直进行,直到所有ID都被处理。

您可以从这里克隆/下载完整程序:https://bitbucket.org/kentrob/pmgfixso/

以下是相关配置:

<appSettings>
    <add key="TellUserAfterNCalls" value="5"/>
    <add key="CrmErrorsBeforeQuitting" value="20"/>
    <add key="MaxThreads" value="10"/>
    <add key="CallIntervalMsecs" value="5000"/>
    <add key="UseAsyncAwait" value="True" />
</appSettings>

从TPL版本开始,以下是启动队列管理器的引导程序运行器:

public static class TplRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        Console.CancelKeyPress += (s, args) =>
        {
            CancelCrmClient();
            args.Cancel = true;
        };

        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new TplQueue(parameters)
            .Start(CancellationTokenSource.Token, idList);

        while (!taskCompletionSource.Task.IsCompleted)
        {
            if (Console.KeyAvailable)
            {
                if (Console.ReadKey().Key != ConsoleKey.Q) continue;
                Console.WriteLine("When all threads are complete, press any key to continue.");
                CancelCrmClient();
            }
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }

    private static void CancelCrmClient()
    {
        CancellationTokenSource.Cancel();
        Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
    }
}

这里是TPL队列管理器本身:

public class TplQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;
    private CancellationToken cancelToken;

    public TplQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }

        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            Task.Run((Action) StartCrmRequest, cancellationToken);
        }

        return taskCompletionSource;
    }

    private void StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            Program.TellUser("Crm client cancelling...");
            ClearQueue();
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private void ProcessResult(Task<CrmResultMessage> response)
    {
        if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            StartCrmRequest();
        }
    }

    private int GetThreadCount()
    {
        lock (locker)
        {
            return threadCount;
        }
    }

    private void IncrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount + 1;
        }
    }

    private int DecrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount - 1;
            return threadCount;
        }
    }

    private void ClearQueue()
    {
        idQueue = new ConcurrentQueue<string>();
    }

    private static void ShowProgress(int processedCount)
    {
        Program.TellUser("{0} activities processed.", processedCount);
    }
}

请注意,我知道一些计数器不是线程安全的,但它们并不关键;threadCount变量是唯一关键的。

这里是虚拟的CRM客户端方法:

public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
    // Here we would normally call a CRM web service.
    return Task.Run(() =>
    {
        try
        {
            if (callIntervalMsecs > 0)
            {
                Thread.Sleep(callIntervalMsecs);
            }
            throw new ApplicationException("Crm web service not available at the moment.");
        }
        catch
        {
            return new CrmResultMessage(activityId, CrmResult.Failed);
        }
    });
}

以下是相同的async/await类(为简洁起见省略了常用方法):

public static class AsyncRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new AsyncQueue(parameters)
            .StartAsync(CancellationTokenSource.Token, idList).Result;

        while (!taskCompletionSource.Task.IsCompleted)
        {
            ...
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }
}

异步/等待队列管理器:

public class AsyncQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private CancellationToken cancelToken;
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;

    public AsyncQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
        IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }
        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            await StartCrmRequest();
        }

        return taskCompletionSource;
    }

    private async Task StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            ...
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
        ProcessResult(crmMessage);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private async void ProcessResult(CrmResultMessage response)
    {
        if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            await StartCrmRequest();
        }
    }
}

因此,将MaxThreads设置为10,将CrmErrorsBeforeQuitting设置为20,在我的计算机上,TPL版本完成时间为19秒,而async/await版本需要35秒。考虑到我需要进行8000多次调用,这是一个显着的差异。有什么想法吗?


2
考虑在所有不需要返回到原始同步上下文的等待任务上使用ConfigureAwait(false)。 - Kirill Shlenskiy
1
@RobKent 你会注意到Jon Skeet的讨论是微观优化,次要关注点。 - GregC
1
根据我的经验,async/await的性能比ContinueWith要好:https://dev59.com/H2Ag5IYBdhLWcg3wM4lb#23878905 - noseratio - open to work
1
@Noseratio,我在阅读你(确实非常有趣的)基准测试后意识到 ExecuteSynchronously 并不是罪魁祸首。 - Kirill Shlenskiy
显示剩余11条评论
3个回答

10

我想我在这里看到了问题,或者至少其中的一部分。仔细看下面的两个代码片段;它们并不等价。

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    Task.Run((Action) StartCrmRequest, cancellationToken);
}

并且:

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    await StartCrmRequest();
}
在原始代码中(假定它在功能上是可行的),只有一个对ContinueWith的调用。如果要保留原始行为,我会预期在简单的重写中看到这么多await语句。

这不是硬性规定,只适用于简单情况,但仍然是需要注意的好事。


刚开始理解这个。我的(可能有误的)理解是,在等待之后,代码的主体仍然会继续运行,因此循环将继续进行。现在只是要确认一下。 - Rob Kent
是的,我刚测试了一下,带有await的循环是同步的。 - Rob Kent
2
@RobKent,它仍然是异步的,但是是按顺序而非并行的。 - Kirill Shlenskiy
谢谢,我通过在循环中使用相同的Task.Run来解决了它。TAP版本现在比TPL版本快20%。 - Rob Kent
对我来说,这是一个警示信号 - 在性能方面,这两者应该非常接近。可能基准测试并不完全正确(基准测试本身就是一门艺术),但也可能表明在没有你的意图下逻辑上发生了某些变化。 - Kirill Shlenskiy
显示剩余3条评论

4

我认为你过于复杂化了解决方案,最终在两种实现中都没有达到想要的效果。

首先,与任何HTTP主机的连接都受服务点管理器限制。客户端环境的默认限制为2,但您可以自行增加。

无论您生成多少线程,活动请求数都不会超过允许的数量。

然后,正如有人指出的那样,await在逻辑上会阻塞执行流程。

最后,你花时间创建了一个AsyncQueue,当你应该使用TPL数据流


是的,感谢您指出ServicePointManager的限制 - 我之前并不知道。虽然在实现实际的网络调用时这肯定是相关的,但对于这个问题来说,它是无关紧要的,因为我们只需要知道为什么异步/等待执行速度更慢而不实际进行网络调用。还是感谢您的评论。 - Rob Kent
如果你真的想证明这一点,你需要一个更简单的例子。 - Paulo Morgado

0

当使用async/await实现时,我期望I/O绑定算法在单个线程上运行。与@KirillShlenskiy不同,我认为负责“带回”到调用者上下文的位并不会导致减速。我认为您试图将其用于I/O绑定操作而超过了线程池。它主要设计用于计算绑定操作。

看一下ForEachAsync。我觉得这就是你要找的东西(Stephen Toub的讨论,你也会发现Wischik的视频很有意义):

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

(使用并发度来减少内存占用)

http://vimeo.com/43808831 http://vimeo.com/43808833


谢谢,我会查看那些链接。 - Rob Kent
在那篇文章中,有一条评论特别赞同:“这些新的await和async关键字真的让我头疼。有时候我觉得我理解了它们,但另一次我意识到我并没有。” 我需要更多地学习它。 - Rob Kent
@RobKent,我很高兴分享这个内容丰富的链接。祝你阅读和观看愉快! - GregC

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接