是否有一个类似于队列(Queue)并实现了IAsyncEnumerable接口的C#类?

4

QueueConcurrentQueue 都实现了 IEnumerable,但没有实现 IAsyncEnumerable。是否存在一个标准类或者可以在 NuGet 上获得的类实现了 IAsyncEnumerable,以便如果队列为空,则 MoveNextAsync 的结果不会在下一项被添加到队列之前完成?


你总是可以自己找到答案:https://dev59.com/pHVD5IYBdhLWcg3wTJrF - Jim Mischel
1个回答

6
如果您正在使用.NET Core平台,至少有两个内置选项:
  1. System.Threading.Tasks.Dataflow.BufferBlock<T>类是TPL Dataflow库的一部分。它本身不支持IAsyncEnumerable<T>,但它提供了可等待的OutputAvailableAsync()方法,使得实现ToAsyncEnumerable扩展方法变得非常简单。

  2. System.Threading.Channels.Channel<T>类是Channels库的核心组件。它通过其Reader.ReadAllAsync()²方法公开了一个IAsyncEnumerable<T>实现。

这两个类也适用于.NET Framework,只需安装不同的NuGet包即可。

BufferBlock<T>IAsyncEnumerable<T> 实现:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> source,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
}

¹ .NET 5之前不可用,从.NET 6开始可用。
² .NET Framework不可用,但可以以类似的方式轻松实现


.NET 6 更新:此功能现已本地化,使用扩展方法 ReceiveAllAsync,其签名如下:
public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
    this IReceivableSourceBlock<TOutput> source,
    CancellationToken cancellationToken = default);

新的API ReceiveAllAsync源代码)和上面展示的ToAsyncEnumerable之间有两个区别。最重要的区别是source数据流块的异常不会传播。因此,ReceiveAllAsync API的使用者需要记住在枚举完成后awaitsourceCompletion,否则将无法观察到异常。第二个区别是ReceiveAllAsync缺少cancellationToken.ThrowIfCancellationRequested();行,因此不能保证取消信号会立即生效。这种行为与ChannelReader.ReadAllAsync方法的行为一致根据Microsoft的说法这是“按设计”。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接