如何使用SqlDataReader返回和消费IAsyncEnumerable

5
请看下面两种方法。第一个返回一个IAsyncEnumerable。第二个尝试使用它消费。
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class SqlUtility
{
    public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
        string connectionString, SqlParameter[] parameters, string commandText,
        [EnumeratorCancellation]CancellationToken cancellationToken)
    {
        using (SqlConnection connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
            using (SqlCommand command = new SqlCommand(commandText, connection))
            {
                command.Parameters.AddRange(parameters);
                using (var reader = await command.ExecuteReaderAsync()
                    .ConfigureAwait(false))
                {
                    while (await reader.ReadAsync().ConfigureAwait(false))
                    {
                        yield return reader;
                    }
                }
            }
        }
    }

    public static async Task Example()
    {
        const string connectionString =
            "Server=localhost;Database=[Redacted];Integrated Security=true";
        SqlParameter[] parameters = new SqlParameter[]
        {
            new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
        };
        const string commandText = "select * from Video where VideoID=@VideoID";
        IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
            parameters, commandText, CancellationToken.None);
        IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
        object videoID = firstRecord["VideoID"]; //Should be 1000.
        // Instead, I get this exception:
        // "Invalid attempt to call MetaData when reader is closed."
    }
}

当代码尝试读取结果的 IDataReader (在 object videoID = firstRecord["VideoID"]; 处)时,我收到了这个异常:

当读取器关闭时,调用 MetaData 是无效的。

这是因为 SqlDataReader 已被处理。有人能提供一种推荐的方法来以异步方式枚举 SqlDataReader,使得每个结果记录都可用于调用方法吗?谢谢。
6个回答

9

在这种情况下,LINQ不是你的朋友,因为FirstAsync会在返回结果之前关闭迭代器,这不是ADO.NET所期望的;基本上:不要在这里使用LINQ,或者至少不要以这种方式使用。您可以尝试像Select这样的东西,在序列仍然打开的情况下执行投影,或者可能更容易的方法是将所有工作转移到类似Dapper的工具中。或者,手动完成:

await foreach (var record in records)
{
    // TODO: process record
    // (perhaps "break"), because you only want the first
}

我现在明白了。谢谢你。你的回答以最简洁的方式填补了我所缺少的知识点。我使用了你告诉我的方法,在读取器被释放之前将记录数据缓存到字典中。 - user1325179

6

您可以避免这种情况,方法是不返回依赖于连接仍然打开的对象。例如,如果您只需要VideoID,那么只需返回它(我假设它是一个int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return reader["VideoID"];
    ...
}

或将其投射到您自己的类中:

public class MyRecord {
    public int VideoId { get; set; }
}

public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return new MyRecord {
                        VideoId = reader["VideoID"]
                    }
    ...
}

或者按照Marc的建议,使用foreachbreak,在第一个后面加上它,这在你的情况下看起来像这样:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
    videoID = record["VideoID"];
    break;
}

3
当您暴露一个开放的DataReader时,关闭它以及底层Connection的责任现在归调用者负责,因此您不应该处理任何内容。相反,您应该使用接受CommandBehavior参数的DbCommand.ExecuteReaderAsync重载,并传递CommandBehavior.CloseConnection值:

当命令被执行时,相关联的Connection对象会在相关联的DataReader对象关闭时关闭。

然后,您只需要希望调用者遵守规则并及时调用DataReader.Close方法,并且不会让连接保持打开状态直到对象被垃圾回收。因此,暴露开放的DataReader应该被视为一种极端的性能优化技术,应该谨慎使用。

顺便说一下,如果您返回一个IEnumerable<IDataRecord>而不是IAsyncEnumerable<IDataRecord>,您将面临同样的问题。


虽然我最终没有这样做,但我认为这是一个有趣的解决方案。你知道命令何时被处理吗?它是否只是在读取器和连接保持打开的情况下正常处理? - user1325179
如果您不关闭或处理连接,当垃圾回收器回收对象时,它将被关闭。确切的时间是未定义的,因为GC过程是非确定性的。 - Theodor Zoulias
谢谢。我明白了。我在询问命令。当读取器关闭时,连接将被关闭。我想知道这个命令。再次感谢。 - user1325179
所有一次性内置对象都是一样的。如果您不显式地处理它们,它们将在回收时被处理。 - Theodor Zoulias
好的。我在想当读取器被处理时,命令是否会随着连接一起被处理。也许我可以尝试在GetRecordsAsync中使用using语句正常处理命令。我不确定这是否会使读取器出现问题,因为它将在命令被处理后使用。也许有一天我会试验一下。谢谢。 - user1325179
啊,现在我明白你的问题了。据我所知,唯一需要尽快释放的关键资源是 SqlConnection,以便它可以返回到连接池并为其他请求提供服务。据我所知,SqlCommand 没有任何其他关键资源,因此在释放连接后处理它并不重要。 - Theodor Zoulias

2

在2021年的尾巴处,我有这个确切的问题。我找不到一个完整的例子,所以我只是试着用我能找到的东西摆弄,直到我得到了一些可以工作的东西。

这里是我的代码-完整的,虽然简单(所以您可以稍后扩展它)的例子,以及一些详细说明我走过的坑:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
    // we need a connection that will last as long as the reader is open,
    // alternately you could pass in an open connection.
    using SqlConnection connection = new SqlConnection( connectionString );
    using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );

    await connection.OpenAsync();
    var reader = await cmd.ExecuteReaderAsync();
    while( await reader.ReadAsync() )
    {
        yield return func( reader );
    }
}

接着在您的代码的任何其他(异步)部分,您可以在await foreach循环中调用您的函数:

private static async Task CallIAsyncEnumerable()
{
    await foreach( var category in ReadAsync( ReaderToCategory ) )
    {
        // do something with your category; save it in a list, write it to disk,
        // make an HTTP call ... the whole world is yours!
    }
}

// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
    return new Category()
    {
        Code = ( string )reader[ "Code" ],
        Group = ( string )reader[ "Group" ]
    };
}


我发现了另外几件事情:你不能在try块内使用yield,但是你可以把cmd.ExecuteReaderAsync()及其之前的所有内容放入try块中或将其放入一个单独的方法中返回DataReader。或者你可以将await foreach包装在try块中;我认为问题在于向try之外的调用者yield(如果你仔细想一下就会明白)。
如果您使用另一种方法来生成阅读器,请将连接传递到该方法中,以便您可以控制其生命周期。如果您的方法创建连接、执行命令并返回SqlDataReader,则连接将在您读取阅读器之前关闭(如果您使用了'using')。再次重申这一点是非常有道理的,但它让我困扰了几分钟。
祝你好运,希望这对未来的某个人有所帮助!

2

除了其他答案之外,您可以将实用方法泛型化,并添加投影委托,Func<IDataRecord,T> projection,作为参数,例如:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
    string connectionString, SqlParameter[] parameters, string commandText,
    Func<IDataRecord, T> projection, // Parameter here
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    ...
                    yield return projection(reader); // Projected here
    ...
}

然后在调用时传递一个lambda表达式或引用一个方法组,例如:

public static object GetVideoId(IDataRecord dataRecord)
    => dataRecord["VideoID"];

such:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);

好主意。但这并不能阻止某人将 IDataRecord 投射到它本身 (GetRecordsAsync(..., x => x, ...),从而导致相同的问题。暴露 IDataRecord 本质上是有问题的。至少你的建议为那些知道自己在做什么并遵守规则的消费者提供了一种出路。 - Theodor Zoulias

0
我建议使用类似这样的代码:
public async IAsyncEnumerable<Customer> GetByCity(string city)
{
   const string sql = "SELECT * FROM Customers WHERE City = @City";
    
   using var command = connection.CreateCommand();
   command.CommandText = sql;
   command.Parameters.AddWithValue("@City", city);
    
   if (connection.State == ConnectionState.Closed)
     await connection.OpenAsync();
    
   using SqlDataReader reader = await command.ExecuteReaderAsync();
    
    while (await reader.ReadAsync())
    {
        yield return Map(reader);
    }
}
    
private static Customer Map(SqlDataReader reader) => new Customer
{
   FirstName = (string) reader["FirstName"],
   LastName  = (string) reader["LastName"]
}
     

await foreach (var customer in customerRepository.GetByCity("Warsaw"))
{
   // ...
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接