为什么使用IAsyncEnumerable比返回async/await Task<T>更慢?

8
我正在测试 C# 8 的异步流,并且似乎当我尝试使用旧的 async/await 模式并返回 Task>时,应用程序运行速度更快。(我使用了秒表进行了测量,尝试运行多次,结果是我提到的旧模式似乎比使用 IAsyncEnumerable 更快)。
以下是我编写的简单控制台应用程序代码 (我也在考虑是否以错误方式从数据库加载数据)。
class Program
    {
        static async Task Main(string[] args)
        {

            // Using the old pattern 
            //Stopwatch stopwatch = Stopwatch.StartNew();
            //foreach (var person in await LoadDataAsync())
            //{
            //    Console.WriteLine($"Id: {person.Id}, Name: {person.Name}");
            //}
            //stopwatch.Stop();
            //Console.WriteLine(stopwatch.ElapsedMilliseconds);


            Stopwatch stopwatch = Stopwatch.StartNew();
            await foreach (var person in LoadDataAsyncStream())
            {
                Console.WriteLine($"Id: {person.Id}, Name: {person.Name}");
            }
            stopwatch.Stop();
            Console.WriteLine(stopwatch.ElapsedMilliseconds);


            Console.ReadKey();
        }


        static async Task<IEnumerable<Person>> LoadDataAsync()
        {
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            var people = new List<Person>();
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                //SqlDataReader
                await connection.OpenAsync();

                string sql = "Select * From Person";
                SqlCommand command = new SqlCommand(sql, connection);

                using (SqlDataReader dataReader = await command.ExecuteReaderAsync())
                {
                    while (await dataReader.ReadAsync())
                    {
                        Person person = new Person();
                        person.Id = Convert.ToInt32(dataReader[nameof(Person.Id)]);
                        person.Name = Convert.ToString(dataReader[nameof(Person.Name)]);
                        person.Address = Convert.ToString(dataReader[nameof(Person.Address)]);
                        person.Occupation = Convert.ToString(dataReader[nameof(Person.Occupation)]);
                        person.Birthday = Convert.ToDateTime(dataReader[nameof(Person.Birthday)]);
                        person.FavoriteColor = Convert.ToString(dataReader[nameof(Person.FavoriteColor)]);
                        person.Quote = Convert.ToString(dataReader[nameof(Person.Quote)]);
                        person.Message = Convert.ToString(dataReader[nameof(Person.Message)]);

                        people.Add(person);
                    }
                }

                await connection.CloseAsync();
            }

            return people;
        }

        static async IAsyncEnumerable<Person> LoadDataAsyncStream()
        {
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                //SqlDataReader
                await connection.OpenAsync();

                string sql = "Select * From Person";
                SqlCommand command = new SqlCommand(sql, connection);

                using (SqlDataReader dataReader = await command.ExecuteReaderAsync())
                {
                    while (await dataReader.ReadAsync())
                    {
                        Person person = new Person();
                        person.Id = Convert.ToInt32(dataReader[nameof(Person.Id)]);
                        person.Name = Convert.ToString(dataReader[nameof(Person.Name)]);
                        person.Address = Convert.ToString(dataReader[nameof(Person.Address)]);
                        person.Occupation = Convert.ToString(dataReader[nameof(Person.Occupation)]);
                        person.Birthday = Convert.ToDateTime(dataReader[nameof(Person.Birthday)]);
                        person.FavoriteColor = Convert.ToString(dataReader[nameof(Person.FavoriteColor)]);
                        person.Quote = Convert.ToString(dataReader[nameof(Person.Quote)]);
                        person.Message = Convert.ToString(dataReader[nameof(Person.Message)]);

                        yield return person;
                    }
                }

                await connection.CloseAsync();
            }
        }

我想知道,IAsyncEnumerable是否不适用于这种情况,或者在使用IAsyncEnumerable查询数据时出了什么问题?我可能错了,但实际上我期望使用IAsyncEnumerable会更快。 (顺便说一句...差异通常在几百毫秒内)
我尝试了10,000行的示例数据。
以下是填充数据的代码,以防万一...
static async Task InsertDataAsync()
        {
            string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
            using (SqlConnection connection = new SqlConnection(connectionString))
            {
                string sql = $"Insert Into Person (Name, Address, Birthday, Occupation, FavoriteColor, Quote, Message) Values";


                for (int i = 0; i < 1000; i++)
                {
                    sql += $"('{"Randel Ramirez " + i}', '{"Address " + i}', '{new DateTime(1989, 4, 26)}', '{"Software Engineer " + i}', '{"Red " + i}', '{"Quote " + i}', '{"Message " + i}'),";
                }

                using (SqlCommand command = new SqlCommand(sql.Remove(sql.Length - 1), connection))
                {
                    command.CommandType = CommandType.Text;

                    await connection.OpenAsync();
                    await command.ExecuteNonQueryAsync();
                    await connection.CloseAsync();
                }

            }
        }

5
没什么奇怪的。使用IAsyncEnumerable时,你等待每个人。使用Task<IEnumerable>时,只需等待一次。使用IAsyncEnumerable的优势是你能够在获取到每个人时看到他们的信息:不必等待所有人都被获取完毕。如果你不需要这样的功能,就不要使用IAsyncEnumerable - canton7
1
@canton7 这并不完全正确。在 LoadDataAsyncStream 中,代码也在等待每个 ExecuteReaderAsync 的调用。 - Fabian Bigler
1
@FabianBigler 我在谈论如何使用 IAsyncEnumerable / Task<IEnumerable>。在这两种情况下,创建它所需的等待次数是相同的。 - canton7
2
实际上,IAsyncEnumerable<T> 实现允许“生成”值的批次,使得对于已经分批的值,MoveNextAsync 可以是同步的。 - Paulo Morgado
@TheodorZoulias 我尝试过不在控制台中输出,但性能仍然似乎更倾向于使用Task<IEnumerable>。 - Randel Ramirez
显示剩余4条评论
2个回答

5

IAsyncEnumerable<T>并不比Task<T>本质上更快或更慢,这取决于具体实现。

IAsyncEnumerable<T>用于异步检索数据,尽快提供单个值。

IAsyncEnumerable<T>允许批量生成值,从而使得MoveNextAsync的某些调用同步执行,如下例所示:

async Task Main()
{
    var hasValue = false;
    var asyncEnumerator = GetValuesAsync().GetAsyncEnumerator();
    do
    {
        var task = asyncEnumerator.MoveNextAsync();
        Console.WriteLine($"Completed synchronously: {task.IsCompleted}");
        hasValue = await task;
        if (hasValue)
        {
            Console.WriteLine($"Value={asyncEnumerator.Current}");
        }
    }
    while (hasValue);
    await asyncEnumerator.DisposeAsync();
}

async IAsyncEnumerable<int> GetValuesAsync()
{
    foreach (var batch in GetValuesBatch())
    {
        await Task.Delay(1000);
        foreach (var value in batch)
        {
            yield return value;
        }
    }
}
IEnumerable<IEnumerable<int>> GetValuesBatch()
{
    yield return Enumerable.Range(0, 3);
    yield return Enumerable.Range(3, 3);
    yield return Enumerable.Range(6, 3);
}

输出:

Completed synchronously: False
Value=0
Completed synchronously: True
Value=1
Completed synchronously: True
Value=2
Completed synchronously: False
Value=3
Completed synchronously: True
Value=4
Completed synchronously: True
Value=5
Completed synchronously: False
Value=6
Completed synchronously: True
Value=7
Completed synchronously: True
Value=8
Completed synchronously: True

1
注意:方法 IAsyncEnumerator<T>.MoveNextAsync 返回一个 ValueTask<bool>,对于 ValueTask<T> 做除了等待它(一次)或调用其 AsTask 方法之外的任何操作都违反了该类型的契约。这包括查询其 IsCompleted 属性。 - Theodor Zoulias
没错,@TheodorZoulias。但是,在这种情况下,该代码在等待或再次调用MoveNextAsync之后并未保留ValueTask - Paulo Morgado
这行代码怎么样:Console.WriteLine($"Completed synchronously: {task.IsCompleted}"); - Theodor Zoulias
1
现在我想起来,调用 IsCompleted 属性应该是安全的。否则,它就没有存在的理由。但是我不敢百分之百确定。 - Theodor Zoulias
2
这不是唯一的限制。在完成之前查询其“Result”属性是不允许的,两次等待也是不允许的,两次调用“AsTask”也是不允许的等等。 - Theodor Zoulias
显示剩余2条评论

1

我认为,“我想知道IAsyncEnumerable是否不适合这种情况”的问题的答案在@Bizhan的分批示例和随后的讨论中有点被忽略了,但是为了重申一下那篇文章:

IAsyncEnumerable<T>异步检索数据,尽可能快地提供单个值

OP正在测量读取所有记录的总时间,并忽略第一条记录被检索并准备好由调用代码使用的速度。

如果“这种情况”意味着尽快将所有数据读入内存,则IAsyncEnumerable不适合此目的。

如果在等待读取所有记录之前开始处理初始记录很重要,则IAsyncEnumerable最适合该任务。

然而,在现实世界中,您应该真正测试整个系统的性能,这将包括对数据进行实际处理而不仅仅是将其输出到控制台。特别是在多线程系统中,可以通过尽快同时开始处理多个记录并同时从数据库中读取更多数据来获得最大的性能提升。相比之下,等待单个线程预先读取所有数据(假设您可以将整个数据集放入内存中),然后才能开始处理它。

另一个例子,在asp.net 6中,控制器可以返回IAsyncEnumerable,框架将在获取结果时流式传输生成的json。这既可以提高第一个对象的时间,也可以在传输受I/O限制时提高总时间。 - Jeremy Lakeman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接