如何等待从IAsyncEnumerable<>返回的所有结果?

21

我正在研究C# 8.0中的新IAsyncEnumerable<T>功能。假设我有一个方法需要使用:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

我知道可以使用await foreach...语法。但是假设我的消费者需要在继续之前获取此函数的所有结果,最好的语法是什么?换句话说,我想要做到这样的效果:

var results = await GetAllResults();

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

如何正确地做到这一点?


Task.WaitAll() ?? - Azhar Khorasany
@AzharKhorasany 那个语法会是什么样子?我已经尝试过 Task.WhenAll(),但是无法让它工作。 - Shaul Behr
1
await foreach (var item in SomeBlackBoxFunctionAsync<T>()) myList.Add(item); - Dmitry Bychenko
从你的方法中返回任务,然后等待全部完成。 - Azhar Khorasany
为什么要在处理结果之前消耗整个流?按定义,异步流可能永远不会结束。 - Panagiotis Kanavos
2个回答

25

首先需要警告的是,按定义,异步流可能永远不会结束并一直产生结果,直到应用程序终止。例如,在SignalR或gRPC中已经使用了这种方式。轮询循环也是这样工作的。

因此,在异步流上使用ToListAsync可能会产生意想不到的后果!


像这样的运算符已经可以通过System.Linq.Async包进行使用。

通过ToListAsync可以消耗整个流。代码看起来非常简单,但隐藏着一些有趣的问题:

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

首先,它返回一个ValueTask。其次,它确保观察到取消并使用ConfigureAwait(false)以防止死锁。最后,如果源已经通过IAsyncIListProvider提供了自己的ToListAsync实现,操作符就会推迟执行。

值得注意的是,虽然IAsyncIListProvider接口是公共的,但它只由System.Linq.Async内部和私有类实现。


6

根据@DmitryBychenko的评论,我编写了一个扩展程序来实现我想要的功能:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    {
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        {
            list.Add(t);
        }

        return list;
    }

我有点惊讶,为什么C# 8.0没有原生支持这个功能...它看起来是一个非常明显的需求。


看一下这个 GitHub thread,基本上有一个社区支持的包和 repo,提供了对 AsyncEnumerable 的 linq 支持。或者使用 Rx 提供的 System.Linq.Async - Pavel Anikhouski
1
请为asyncEnumerable添加验证(因为AllResultsAsync是一个public方法) - 它不应该为null,并且我表示赞同。 - Dmitry Bychenko
1
它作为System.Linq.Async包的一部分被发布。 - Panagiotis Kanavos
@PanagiotisKanavos 很高兴听到这个消息!System.Linq.Async 中叫什么方法?如果有的话,那应该是我问题的正确答案。 - Shaul Behr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接