使用yield return的异步任务如何返回IEnumerable类型?

23
下面的方法无法编译。有什么替代方案吗?
public static async Task<IEnumerable<object[]>> GetRecordsAsync(
    this Transaction transaction,
    string commandText,
    params SqlParameter[] parameters)
{
    // Get a SqlDataReader
    var reader = await transaction.GetReaderAsync(commandText, parameters);
    var fieldCount = -1;
    // Begin iterating through records asynchronously
    while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean
    {
        // Grab all the field values out
        if (fieldCount < 0)
            fieldCount = reader.FieldCount;
        var fields = new object[fieldCount];
        reader.GetValues(fields);
        // Yield return the field values from this record
        yield return fields;
    }
}

错误信息:
“TransactionExtensions.GetRecordsAsync(Transaction, string, params SqlParameter[])” 的主体不能是迭代器块,因为“Task>”不是迭代器接口类型。
我看不出如何将此回答适应类似但不同的问题,因为我不知道循环将进行多少次。
编辑:已修正格式。

6
IEnumerable<T>本身不支持该功能。请使用响应式扩展(Reactive Extensions)。 - SLaks
1
你可以使用 ObservableCollection 监控添加的元素。创建并将其传递给 GetRecordsAsync,该方法现在只返回 Task,并在准备好产生 fields 时将其添加到其中。现在我想起来,也可以通过将 "on fields received" 委托直接传递给该方法来实现。 - IS4
@IllidanS4 我认为这归结于SLaks所提供的评论。两个好主意,但是Reactive Extensive带来了许多其他好处。 - Matt Thomas
2
@MattThomas,还可以通过使用async/await与DataReader(无中间缓冲区!)来查看一些替代想法。 - noseratio - open to work
1
@Noseratio,感谢提供链接。我从中得到的最佳选择是Rx。对我来说,答案感觉像是Rx(或更一般地说,发布-订阅)所做的事情。 - Matt Thomas
@SLaks 如果您将其发布为答案,我会接受它。谢谢! - Matt Thomas
3个回答

12

根据@SLaks对问题的评论,这里提供一种使用响应式扩展的通用替代方案:

/// <summary>
/// Turns the given asynchronous functions into an IObservable
/// </summary>
static IObservable<T> ToObservable<T>(
    Func<Task<bool>> shouldLoopAsync,
    Func<Task<T>> getAsync)
{
    return Observable.Create<T>(
        observer => Task.Run(async () =>
            {
                while (await shouldLoopAsync())
                {
                    var value = await getAsync();
                    observer.OnNext(value);
                }
                observer.OnCompleted();
            }
        )
    );
}

示例用法,旨在解决问题的特定情况:

/// <summary>
/// Asynchronously processes each record of the given reader using the given handler
/// </summary>
static async Task ProcessResultsAsync(this SqlDataReader reader, Action<object[]> fieldsHandler)
{
    // Set up async functions for the reader
    var shouldLoopAsync = (Func<Task<bool>>)reader.ReadAsync;
    var getAsync = new Func<SqlDataReader, Func<Task<object[]>>>(_reader =>
    {
        var fieldCount = -1;
        return () => Task.Run(() =>
        {
            Interlocked.CompareExchange(ref fieldCount, _reader.FieldCount, -1);
            var fields = new object[fieldCount];
            _reader.GetValues(fields);
            return fields;
        });
    })(reader);

    // Turn the async functions into an IObservable
    var observable = ToObservable(shouldLoopAsync, getAsync);

    // Process the fields as they become available
    var finished = new ManualResetEventSlim(); // This will be our signal for when the observable completes
    using (observable.Subscribe(
        onNext: fieldsHandler, // Invoke the handler for each set of fields
        onCompleted: finished.Set // Set the gate when the observable completes
    )) // Don't forget best practice of disposing IDisposables
        // Asynchronously wait for the gate to be set
        await Task.Run((Action)finished.Wait);
}

(请注意,上面代码块中的getAsync可以简化,但我喜欢它明确地说明正在创建的闭包)。
......最后:
// Get a SqlDataReader
var reader = await transaction.GetReaderAsync(commandText, parameters);
// Do something with the records
await reader.ProcessResultsAsync(fields => { /* Code here to process each record */ });

11
不要返回一个 Task<IEnumerable<T>>,也不要使用 Task 来实现;相反,应返回一个 IAsyncEnumerable<T>。无需使用第三方库或其他解决方法,甚至无需修改原始方法的主体内容。
public static async IAsyncEnumerable<object[]> GetRecordsAsync(
    this Transaction transaction,
    string commandText,
    params SqlParameter[] parameters)
{
    // Get a SqlDataReader
    var reader = await transaction.GetReaderAsync(commandText, parameters);
    var fieldCount = -1;
    // Begin iterating through records asynchronously
    while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean
    {
        // Grab all the field values out
        if (fieldCount < 0)
            fieldCount = reader.FieldCount;
        var fields = new object[fieldCount];
        reader.GetValues(fields);
        // Yield return the field values from this record
        yield return fields;
    }
}

对于那些没有使用Core 3.0或更高版本的人,可以通过Microsoft.Bcl.AsyncInterfaces NuGet包获得此功能。您还需要C# 8或更高版本,因此我们仍在Framework中摸索的人需要进行一些手动项目编辑。 - KeithS

0

我没有使用第三方扩展程序就解决了它:

public async Task<IEnumerable<Item>> GetAllFromDb()
{
    OracleConnection connection = null;
    DbDataReader reader = null;
    try
    {
        connection = new OracleConnection(connectionString);
        var command = new OracleCommand(queryString, connection);
        connection.Open();

        reader = await command.ExecuteReaderAsync();

        return this.BuildEnumerable(connection, reader);
    }
    catch (Exception)
    {
        reader?.Dispose();
        connection?.Dispose();          
        throw;
    }
}

private IEnumerable<Item> BuildEnumerable(OracleConnection connection, DbDataReader reader)
{
    using (connection)
    using (reader)
    {
        while (reader.Read())
        {
            var item = new Item()
            {
                Prop = reader.GetString(0),
            };
            yield return item;
        }
    }
}

这个例子是针对Oracle Data Reader的,但是相同的方法也适用于任何与yield return结合使用的异步操作。


3
但是在你调用reader.Read()的核心部分,它是一个阻塞方法。虽然代码可以编译和运行,但我认为你并没有真正体验到异步操作带来的好处。 - H H
只是想指出我们确实有异步操作: reader = await command.ExecuteReaderAsync(); 因此,如果正确实现,此操作将解除线程阻塞。关于读取操作 - 我同意@Henk-Holterman的看法,它是同步的。但如果您想使用标准C#,Linq,foreach等,这是最好的权衡选择。 - VeganHunter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接