我需要编写一个控制台应用程序,调用Microsoft Dynamics CRM Web服务来对8,000多个CRM对象执行操作。Web服务调用的详细信息与此处无关,但我需要一个多线程客户端,以便可以并行进行调用。我希望能够从配置设置中控制使用的线程数,并且还希望应用程序在服务错误达到配置定义的阈值时能够取消整个操作。
我使用Task Parallel Library Task.Run和ContinueWith编写了代码,跟踪了正在进行的调用(线程)数量、接收到的错误数量以及用户是否从键盘上取消。一切都很顺利,我有广泛的日志记录,以确保线程干净地完成,并且在运行结束时一切都是整洁的。我可以看到程序在并行使用最大数量的线程,并且如果达到我们的最大限制,则等待正在运行的任务完成后再开始另一个任务。
在我的代码审查过程中,我的同事建议使用async/await而不是tasks和continuations,因此我创建了一个分支并以这种方式重写了它。结果非常有趣-async/await版本几乎慢了两倍,而且从未达到允许的并行操作/线程的最大数量。TPL版本总是达到10个并行线程,而async/await版本从未超过5个。
我的问题是:我在编写async/await代码(或TPL代码)时犯了错误吗?如果我没有编写错误的代码,你能解释为什么async/await不那么高效吗?这是否意味着继续使用TPL进行多线程编码更好?
请注意,我测试的代码实际上没有调用CRM-CrmClient类只是按配置中指定的持续时间(五秒钟)休眠一段时间,然后抛出异常。这意味着没有外部变量会影响性能。
为了回答这个问题,我创建了一个简化版程序,将两个版本结合在一起;哪一个被调用取决于配置设置。它们都从引导运行程序开始,设置环境、创建队列类,然后使用TaskCompletionSource等待完成。CancellationTokenSource用于向用户发出取消信号。要处理的ID列表从嵌入文件中读取并推送到ConcurrentQueue中。它们都从StartCrmRequest开始调用多达最大线程数次;随后,每次处理结果时,ProcessResult方法再次调用StartCrmRequest,保持一直进行,直到所有ID都被处理。
您可以从这里克隆/下载完整程序:https://bitbucket.org/kentrob/pmgfixso/
以下是相关配置:
<appSettings>
<add key="TellUserAfterNCalls" value="5"/>
<add key="CrmErrorsBeforeQuitting" value="20"/>
<add key="MaxThreads" value="10"/>
<add key="CallIntervalMsecs" value="5000"/>
<add key="UseAsyncAwait" value="True" />
</appSettings>
从TPL版本开始,以下是启动队列管理器的引导程序运行器:
public static class TplRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
Console.CancelKeyPress += (s, args) =>
{
CancelCrmClient();
args.Cancel = true;
};
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new TplQueue(parameters)
.Start(CancellationTokenSource.Token, idList);
while (!taskCompletionSource.Task.IsCompleted)
{
if (Console.KeyAvailable)
{
if (Console.ReadKey().Key != ConsoleKey.Q) continue;
Console.WriteLine("When all threads are complete, press any key to continue.");
CancelCrmClient();
}
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
private static void CancelCrmClient()
{
CancellationTokenSource.Cancel();
Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
}
}
这里是TPL队列管理器本身:
public class TplQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
private CancellationToken cancelToken;
public TplQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
Task.Run((Action) StartCrmRequest, cancellationToken);
}
return taskCompletionSource;
}
private void StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Program.TellUser("Crm client cancelling...");
ClearQueue();
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private void ProcessResult(Task<CrmResultMessage> response)
{
if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
StartCrmRequest();
}
}
private int GetThreadCount()
{
lock (locker)
{
return threadCount;
}
}
private void IncrementThreadCount()
{
lock (locker)
{
threadCount = threadCount + 1;
}
}
private int DecrementThreadCount()
{
lock (locker)
{
threadCount = threadCount - 1;
return threadCount;
}
}
private void ClearQueue()
{
idQueue = new ConcurrentQueue<string>();
}
private static void ShowProgress(int processedCount)
{
Program.TellUser("{0} activities processed.", processedCount);
}
}
请注意,我知道一些计数器不是线程安全的,但它们并不关键;threadCount变量是唯一关键的。
这里是虚拟的CRM客户端方法:
public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
// Here we would normally call a CRM web service.
return Task.Run(() =>
{
try
{
if (callIntervalMsecs > 0)
{
Thread.Sleep(callIntervalMsecs);
}
throw new ApplicationException("Crm web service not available at the moment.");
}
catch
{
return new CrmResultMessage(activityId, CrmResult.Failed);
}
});
}
以下是相同的async/await类(为简洁起见省略了常用方法):
public static class AsyncRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new AsyncQueue(parameters)
.StartAsync(CancellationTokenSource.Token, idList).Result;
while (!taskCompletionSource.Task.IsCompleted)
{
...
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
}
异步/等待队列管理器:
public class AsyncQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private CancellationToken cancelToken;
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
public AsyncQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
await StartCrmRequest();
}
return taskCompletionSource;
}
private async Task StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
...
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
ProcessResult(crmMessage);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private async void ProcessResult(CrmResultMessage response)
{
if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
await StartCrmRequest();
}
}
}
因此,将MaxThreads设置为10,将CrmErrorsBeforeQuitting设置为20,在我的计算机上,TPL版本完成时间为19秒,而async/await版本需要35秒。考虑到我需要进行8000多次调用,这是一个显着的差异。有什么想法吗?
async/await
的性能比ContinueWith
要好:https://dev59.com/H2Ag5IYBdhLWcg3wM4lb#23878905 - noseratio - open to workExecuteSynchronously
并不是罪魁祸首。 - Kirill Shlenskiy