C#中的异步方法单元测试

7

我有一个问题。

我尝试在我的C#代码中复制多个sp(存储过程)调用,但我想以异步方式进行。

TSQL示例: (执行sp @key = 15072000173475; 执行sp @key = 15072000173571; ... 执行sp @key = n;

[TestClass]
public class UnitTestNomenclature {
    [TestMethod]
    public void ParallelSQLMethod() {
        Task scropeTasks = null;
        //real amount is more then 1500
        long[] keys = new long[] {15072000173475,15072000173571 ... n };

        try {
            var tasks = keys.Select( i =>  Task.Run(async () => { await RunStoredProc(i); }));
            scropeTasks =  Task.WhenAll(tasks);

            scropeTasks.Wait();
        } catch (Exception ex) {
            Debug.WriteLine("Exception: " + ex.Message);

            Debug.WriteLine("IsFaulted: " + scropeTasks.IsFaulted);
            foreach (var inx in scropeTasks.Exception.InnerExceptions) {
                Debug.WriteLine("Details: " + inx.Message);
            }
        }

        Assert.AreEqual(1, 1);
    }

    public async Task RunStoredProc(long scollNumbParam) {
        const string strStoredProcName = @"[dbo].[sp]";
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;")) {
            await conn.OpenAsync();
            Debug.WriteLine("============================================ Connection is open: ==============================================");

            // info
            Debug.WriteLine(String.Format("Connection: {0}", conn.ClientConnectionId));
            Debug.WriteLine(String.Format("State: {0}", conn.State.ToString()));

            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) {

                SqlParameter scrParam = new SqlParameter() {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);

                Debug.WriteLine("Start of Proccesing: " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
                Debug.WriteLine("End of Proccesing: " + scollNumbParam);

            }
        }

        Debug.WriteLine("============================================ Connection is closed: ==============================================");
    }
}

这是我在输出窗口中得到的内容:
========== Connection is open: ========
Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
========== Connection is open: ==========
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
.....
End of Proccesing: 15072000173475
=========== Connection is closed: =========
End of Proccesing: 15072000173571
=========== Connection is closed: =========

....

A timeout occurred while waiting for memory resources to execute the query in resource pool 'default' (2). Rerun the query.
Actual error number: 8645
Actual line number: 98

同时调试信息显示连接池已满。我认为主要原因是连接没有被适当处理,但我如何使用异步实现呢?如果在声明异步任务之前尝试只打开一个连接并将其传递给RunStoredProc方法,则会出现连接不支持MultipleActiveResultSets的错误。
using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;)) {

                    conn.OpenAsync();
                    var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i, conn); }));
                    scropeTasks = Task.WhenAll(tasks);

                    scropeTasks.Wait();
                }

                Debug.WriteLine("========== Connection is closed: ==========");

这是我在输出窗口中得到的内容:
Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
======= Connection is open: =============
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
========= Connection is open: =========

什么阻止你将测试变成异步的呢? - Nkosi
什么都没有,我做了这个并得到了以上的错误。我非常想知道为什么会出现这些错误。 - AllmanTool
我期望获得异步处理。我正在寻找解决方案。我该如何在每次迭代后实现连接释放事件?这是否可能?我正在尝试弄清楚异步/等待+ado.net如何协同工作。 - AllmanTool
通过按顺序对它们进行迭代。虽然需要更长的时间,但至少连接将被适当地释放,以免过载资源。您还可以考虑将它们分批处理。 - Nkosi
连接池最大容量超过了100。也许,有没有办法将此任务分为多个部分(每次100个),然后刷新连接池? - AllmanTool
显示剩余5条评论
3个回答

8
您有大约1500个任务同时执行,还混合了异步和阻塞调用(例如.Wait),这可能会导致死锁。
将测试变为异步并尽量避免使用async void,除非在事件处理程序上使用。
尝试按顺序迭代它们。 这需要更长的时间,但至少连接将被适当地释放,以避免过载资源。 您也可以考虑分批次完成它们。
[TestMethod]
public async Task ParallelSQLMethod() {
    //real amount is more then 1500
    var keys = new long[] { 
        15072000173475, 
        15072000173571, 
        //....., n
    };
    var tasks = keys.Select(i => RunStoredProc(i));
    var batchSize = 50; //Or smaller

    //run tasks in batches
    var sequence = tasks;
    while (sequence.Any()) {
        var batch = sequence.Take(batchSize);
        sequence = sequence.Skip(batchSize);

        await Task.WhenAll(batch);
    }
}

2
我担心在这里看到了关于异步/等待/并发/线程的经典问题。测试中存在多个问题,我将逐一解释。
1) 测试用例架构。您没有说明编写的单元测试和SQL服务器是否在同一台计算机上或不同的计算机上。
如果在同一台计算机上,则应该使用最大值(n_cores/2, 1)个连接。
如果在不同的计算机上,则应该使用1-3个连接。
这些数字可以根据存储过程行为、长/短计算、传输数据量、连接速度等进行调整。
2) SQL连接并发问题。您不能打开一个连接,然后以某种方式尝试通过此连接同时调用1500个请求。实际上,甚至不能同时调用两个请求。
这就是它告诉您的:连接不支持MultipleActiveResultSets。
您必须使用一个打开的连接来为一个请求提供服务。
但是!您不必仅将其用于一个请求并关闭它,您可以在第一个请求完成后运行下一个请求,这比关闭并创建新连接要快得多。您只需要依次通过每个连接运行这些请求...
3) 因此,正确的测试用例架构应如下所示:
- 异步测试方法, - 将所有密钥推送到ConcurrentQueue队列中; - 然后是大小基于所需连接数的Task[]任务数组, - 启动每个任务并将它们存储到数组中, - 等待Task.WhenAll(tasks)。
我非常喜欢玩并发/并行代码,但是增加越来越多的任务而不协调它们并不能帮助加速事情,相反会浪费资源...
4) 示例:
[TestClass]
public class UnitTestNomenclature
{
    [TestMethod]
    public async Task ParallelSQLMethod()
    {
        long[] keys = new long[] { 15072000173475, 15072000173571 };

        ConcurrentQueue<long> queue = new ConcurrentQueue<long>(keys);

        int connections = Math.Max(1, Environment.ProcessorCount / 2);

        Task[] tasks =
        Enumerable
        .Range(0, connections)
        .Select(i => Task.Run<Task>(() => RunConnection(i, queue)).Unwrap())
        .ToArray()
        ;

        await Task.WhenAll(tasks);
    }

    public async Task RunConnection(int connection, ConcurrentQueue<long> queue)
    {
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;"))
        {
            await conn.OpenAsync();
            Debug.WriteLine($"====== Connection[{connection}] is open: ======");

            Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
            Debug.WriteLine($"Connection[{connection}].State: {conn.State}");

            long scollNumbParam;

            while (queue.TryDequeue(out scollNumbParam))
            {
                await RunStoredProc(conn, connection, scollNumbParam);
                Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
            }
        }

        Debug.WriteLine($"====== Connection[{connection}] is closed  ======");
    }

    public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam)
    {
        const string strStoredProcName = @"[dbo].[sp]";

        using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure })
        {
            SqlParameter scrParam = new SqlParameter()
            {
                ParameterName = "@KEYKRT",
                Value = scollNumbParam,
                SqlDbType = SqlDbType.BigInt
            };
            cmd.Parameters.Add(scrParam);

            Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam);
            await cmd.ExecuteNonQueryAsync();
            Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam);
        }
    }
}

1
我对我的代码进行了一些实验,并取得了适当的结果(异步处理)。我改变了连接链接(添加:Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true),即增加了连接池和连接时间持续时间。
  • 最大连接池大小 (Max Pool Size)

  • 连接池中最少连接数 (Min Pool Size)

  • 在连接池中保留连接的秒数 (Connection Lifetime)(0为最大值)

提示: 过多的池 Max Pool Size(默认为100)可能会挂起您的服务器(我试过了)。
另外,我注意到在我的连接字符串中使用'MultipleActiveResultSets=true'时,我没有收到'连接不支持MultipleActiveResultSets'的异常,但处理是同步的。您可以阅读有关此问题的内容(MARS)。

结论:服务器上的并行执行不是MARS功能,而且MARS操作不是线程安全的。MARS没有设计为消除应用程序中多个连接的所有要求。如果应用程序需要对服务器并行执行命令,则应使用多个连接。通常出于这种原因而使用它。


是的,通过一个通道进行并发通信并不容易。我研究了几个消息队列系统,通常对这些系统处理并发感到非常失望。由于工具箱中充满了笨重的解决方案,所以做这些事情并不容易。可能是MARS功能被嫁接到顺序技术之上,而2005年,我认为这是非常有可能的。 - ipavlu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接