如何在迭代器方法中使用`yield return`使`await …`正常工作?

20

我有一些现有的代码,看起来类似于:

IEnumerable<SomeClass> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(connectionString))
    using (SqlCommand cmd = new SqlCommand(sql, conn)
    {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            SomeClass someClass = f(reader); // create instance based on returned row
            yield return someClass;
        }
    } 
}

看起来我可以通过使用reader.ReadAsync()受益。但如果我只是修改这一行:

        while (await reader.ReadAsync())

编译器提示我await只能在标记为async的方法中使用,并建议我修改方法签名为:
async Task<IEnumerable<SomeClass>> GetStuff()

然而,这样做会使GetStuff()无法使用,因为:

GetStuff()的主体不能是迭代器块,因为Task<IEnumerable<SomeClass>>不是一个迭代器接口类型。

我确定我在异步编程模型中缺少关键概念。

问题:

  • 我可以在我的迭代器中使用ReadAsync()吗? 如何操作?
  • 我如何以不同的方式思考异步范式,以便理解它在这种类型的情况下的工作原理?

迭代器块在这里真的有用吗?构建 List<SomClass> 并返回它几乎可以达到相同的效果,不是吗? - svick
@svick:不行,因为处理SomeClass很昂贵。迭代器块的结果由多个线程处理。构建完整列表会花费许多时间来处理生产数据,这是可以用来处理结果的时间。此外,完整列表将非常大,无谓地消耗内存。 - Eric J.
4个回答

22

问题在于你所询问的并没有太多意义。 IEnumerable<T> 是一个同步接口,返回 Task<IEnumerable<T>> 并不能帮到你太多,因为无论如何都需要有线程阻塞等待每个项。

实际上你想要返回的是一些异步替代 IEnumerable<T> 的方式:比如像 IObservable<T>、 TPL Dataflow 中的数据流块或者计划添加到 C# 8.0/.Net Core 3.0 中的 IAsyncEnumerable<T>。(同时,也有一些 含有这个功能。)

使用 TPL Dataflow,可以采用以下方式:

ISourceBlock<SomeClass> GetStuff() {
    var block = new BufferBlock<SomeClass>();

    Task.Run(async () =>
    {
        using (SqlConnection conn = new SqlConnection(connectionString))
        using (SqlCommand cmd = new SqlCommand(sql, conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            while (await reader.ReadAsync())
            {
                SomeClass someClass;
                // Create an instance of SomeClass based on row returned.
                block.Post(someClass);
            }
            block.Complete();
        } 
    });

    return block;
}

你可能需要在上述代码中添加错误处理,但除此之外,它应该能够工作并且完全是异步的。

接下来的代码将通过异步方式从返回的块中消耗项目,可能会使用 ActionBlock


我没有点踩,但是对于读者来说:Mike的回答更好,请往下看! - Csaba Toth
3
@CsabaToth,你能解释一下你为什么这么认为吗?而且Mike在他的回答中提到了两个备选方案,你指的是哪一个?(此外,我倾向于给那些实际上是错误的或者促进不良做法的回答点踩,而不只是因为我认为其他答案更好。但当然你可以自行决定如何投票。) - svick
从表面上看,只是刚开始研究TPL Dataflow,这似乎是一个更好的解决方案,因为我了解到Dataflow旨在解决这种问题(并且相对最优)。 - El Zorko
@svick,有没有一种不消耗线程池中线程的方法来完成这个任务?还是说我应该提一个新问题? - Marc L.
@MarcL。我的示例代码在等待来自SQL服务器的数据时不会消耗线程,这就是await的工作原理。 - svick
显示剩余4条评论

20

不,您目前无法在迭代器块中使用异步。正如svick所说,您需要类似于IAsyncEnumerable的东西来实现这一点。

如果函数返回值是Task<IEnumerable<SomeClass>>,那么意味着该函数返回一个单独的Task对象,该对象一旦完成,将为您提供一个完整的IEnumerable(在此可枚举的内容不包含Task)。任务对象完成后,调用方应该能够同步地迭代在可枚举中返回的所有项。

以下是一个返回Task<IEnumerable<SomeClass>>的解决方案。您可以通过执行类似于此类操作获得异步的大部分好处:

async Task<IEnumerable<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            return ReadItems(reader).ToArray();
        }
    }
}

IEnumerable<SomeClass> ReadItems(SqlDataReader reader)
{
    while (reader.Read())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        yield return someClass;
    }
}

...以及一个使用示例:

async void Caller()
{
    // Calls get-stuff, which returns immediately with a Task
    Task<IEnumerable<SomeClass>> itemsAsync = GetStuff();
    // Wait for the task to complete so we can get the items
    IEnumerable<SomeClass> items = await itemsAsync;
    // Iterate synchronously through the items which are all already present
    foreach (SomeClass item in items)
    {
        Console.WriteLine(item);
    }
}

这里你有迭代器部分和异步部分在不同的函数中,这允许你同时使用异步和yield语法。 GetStuff 函数以异步方式获取数据,然后 ReadItems 同步地将数据读取到可枚举对象中。

注意 ToArray() 的调用。这是必要的,因为枚举器函数执行惰性计算,所以您的异步函数可能会在所有数据被读取之前释放连接和命令。这是因为 using 块覆盖了 Task 执行的持续时间,但是您会在任务完成后对其进行迭代。

此解决方案不使用 ReadAsync,但使用 OpenAsyncExecuteReaderAsync,这可能会给您带来最大的好处。根据我的经验,ExecuteReader 花费最长的时间并且受益最大的就是异步操作。当我读取第一行时,SqlDataReader 已经有了其他所有行,并且 ReadAsync 只是同步返回。如果您也是这种情况,则移动到基于推送的系统(例如 IObservable<T>)不会带来显著的好处(这将需要对调用函数进行重大修改)。

为说明,考虑一种解决此问题的替代方法:

IEnumerable<Task<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            conn.Open();
            SqlDataReader reader = cmd.ExecuteReader();
            while (true)
                yield return ReadItem(reader);
        }
    }
}

async Task<SomeClass> ReadItem(SqlDataReader reader)
{
    if (await reader.ReadAsync())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        return someClass;
    }
    else
        return null; // Mark end of sequence
}

...以及一个使用示例:

async void Caller()
{
    // Synchronously get a list of Tasks
    IEnumerable<Task<SomeClass>> items = GetStuff();
    // Iterate through the Tasks
    foreach (Task<SomeClass> itemAsync in items)
    {
        // Wait for the task to complete. We need to wait for 
        // it to complete before we can know if it's the end of
        // the sequence
        SomeClass item = await itemAsync;
        // End of sequence?
        if (item == null) 
            break;
        Console.WriteLine(item);
    }
}
在这种情况下,GetStuff 立即返回一个可枚举的对象,其中可枚举对象中的每个项目都是一个任务,在该任务完成后将呈现一个 SomeClass 对象。 这种方法有一些缺陷。首先,枚举对象同步返回,因此在它返回时我们实际上不知道结果中有多少行,这就是为什么我将其制作成无限序列的原因。这是完全合法的,但它具有一些副作用。 我需要使用null来表示无限序列中有用数据的结束。其次,您必须小心地迭代它。您需要向前迭代它,并在迭代到下一行之前等待每一行。在所有任务完成之前,您还必须仅处置迭代器,以便 GC 在使用完毕之前不收集连接。出于这些原因,这不是一个安全的解决方案,我必须强调,我包含它只是为了帮助回答您的第二个问题。

1
我认为你错误地认为SqlConnection.Dispose()在你的第二个版本中不会被调用。连接在迭代器块方法中使用了using,所以当枚举器被处理时,它将被处理,这会在foreach结束时自动发生。你可以通过将可释放对象传递给ReadItems()来在第一个版本中使用类似的方法。 - svick
是的,我认为你是正确的。我已经从答案中删除了那个声明。 - Mike

2

从C# 8开始,可以使用IAsyncEnumerable实现此操作。

修改后的代码:

async IAsyncEnumerable<SomeClass> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(connectionString))
    using (SqlCommand cmd = new SqlCommand(sql, conn)
    {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            SomeClass someClass = f(reader); // create instance based on returned row
            yield return someClass;
        }
    } 
}

这样使用它:

await foreach (var stuff in GetStuff())
    ...

2
在我的经验中,严格来说,在SqlCommand的上下文中异步迭代器(或者它们的可能性)方面,我注意到同步版本的代码在速度和内存消耗方面都比其async对应物优越得多。

也许,要带着一颗谨慎的心态看待这个观察结果,因为测试范围仅限于我的机器和本地SQL Server实例。

不要误会,async/await范式在.NET环境中非常简单、强大和有用,在适当的情况下非常实用。然而,经过多番努力,我并不认为数据库访问是它的正确使用场景。除非您需要同时执行多个命令,在这种情况下,您可以简单地使用TPL将命令一起启动。
相反,我更喜欢采取以下考虑:
  • 保持SQL工作的单元小、简单和可组合(即使您的SQL执行“便宜”)。
  • 避免在SQL Server上进行可以向应用程序级别推送的工作。这种情况的完美例子是排序。
  • 最重要的是,在规模上测试您的SQL代码并查看统计IO输出/执行计划。当有100万条记录时,以10k条记录运行迅速的查询可能会(而且很可能会)表现完全不同。

您可以认为在某些报告场景中,上述要求都不可能实现。但是,在报告服务的异步性方面真的需要吗?

微软布道师Rick Anderson有一篇关于这个问题的精彩文章。请注意,它很旧(来自2009年),但仍然非常相关。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接