Queue
和 ConcurrentQueue
都实现了 IEnumerable
,但没有实现 IAsyncEnumerable
。是否存在一个标准类或者可以在 NuGet 上获得的类实现了 IAsyncEnumerable
,以便如果队列为空,则 MoveNextAsync
的结果不会在下一项被添加到队列之前完成?
Queue
和 ConcurrentQueue
都实现了 IEnumerable
,但没有实现 IAsyncEnumerable
。是否存在一个标准类或者可以在 NuGet 上获得的类实现了 IAsyncEnumerable
,以便如果队列为空,则 MoveNextAsync
的结果不会在下一项被添加到队列之前完成?
System.Threading.Tasks.Dataflow.BufferBlock<T>
类是TPL Dataflow库的一部分。它本身不支持IAsyncEnumerable<T>
,但它提供了可等待的OutputAvailableAsync()
方法,使得实现ToAsyncEnumerable
扩展方法变得非常简单。
System.Threading.Channels.Channel<T>
类是Channels库的核心组件。它通过其Reader.ReadAllAsync()
²方法公开了一个IAsyncEnumerable<T>
实现。
这两个类也适用于.NET Framework,只需安装不同的NuGet包即可。
BufferBlock<T>
的 IAsyncEnumerable<T>
实现:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
¹ .NET 5之前不可用,从.NET 6开始可用。
² .NET Framework不可用,但可以以类似的方式轻松实现。
ReceiveAllAsync
,其签名如下:public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
this IReceivableSourceBlock<TOutput> source,
CancellationToken cancellationToken = default);
ReceiveAllAsync
(源代码)和上面展示的ToAsyncEnumerable
之间有两个区别。最重要的区别是source
数据流块的异常不会传播。因此,ReceiveAllAsync
API的使用者需要记住在枚举完成后await
source
的Completion
,否则将无法观察到异常。第二个区别是ReceiveAllAsync
缺少cancellationToken.ThrowIfCancellationRequested();
行,因此不能保证取消信号会立即生效。这种行为与ChannelReader.ReadAllAsync方法的行为一致,根据Microsoft的说法这是“按设计”。