将流处理为IAsyncEnumerable - 流不可读

3

我有一个工作流程,试图实现以下目标:

  • 一个接受回调函数的方法,该方法内部生成一个Stream,该方法的调用者可以使用回调函数以任何希望的方式处理Stream
  • 在特定情况下,调用者使用回调函数从Stream中生成IAsyncEnumerable

下面是一个最小化的重现示例:

class Program
{
    private static async Task<Stream> GetStream()
    {
        var text =
            @"Multi-line
            string";

        await Task.Yield();

        var bytes = Encoding.UTF8.GetBytes(text);
        return new MemoryStream(bytes);
    }

    private static async Task<T> StreamData<T>(Func<Stream, T> streamAction)
    {
        await using var stream = await GetStream();
        return streamAction(stream);
    }

    private static async Task StreamData(Func<Stream, Task> streamAction)
    {
        await using var stream = await GetStream();
        await streamAction(stream);
    }

    private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
    {
        using var reader = new StreamReader(stream);

        var line = await reader.ReadLineAsync();
        while (line != null)
        {
            yield return line;
            line = await reader.ReadLineAsync();
        }
    }

    private static async Task Test1()
    {
        async Task GetRecords(Stream str)
        {
            await foreach(var line in GetTextLinesFromStream(str))
                Console.WriteLine(line);
        }

        await StreamData(GetRecords);
    }

    private static async Task Test2()
    {
        await foreach(var line in await StreamData(GetTextLinesFromStream))
            Console.WriteLine(line);
    }

    static async Task Main(string[] args)
    {
        await Test1();
        await Test2();
    }
}  

这里,方法Test1可以正常工作,而Test2则无法运行,出现Stream is not readable的错误。问题在于第二种情况下,当代码真正处理流时,该流已经被关闭了。

可以推测,第一种情况与第二种情况的区别在于,在第一种情况下,读取流时仍处于可处理的stream上下文中,而在第二种情况下我们已经超出了该上下文。

然而,我认为第二种情况也是有效的——至少我觉得它非常符合C#语言特色。是否有任何遗漏的内容可以使第二种情况也能正常工作吗?

1个回答

2
Test2方法的问题在于,在创建IAsyncEnumerable<string>时,Stream已经被处理,而不是在枚举完成时处理。

Test2方法使用了第一个StreamData重载,它返回一个Task<T>。在这种情况下,T是一个IAsyncEnumerable<string>。因此,StreamData方法返回一个生成异步序列的任务,并立即处理流(在生成序列后)。显然,这不是处理流的正确时机。正确的时机应该是在await foreach循环完成后。

为了使Test2透明地工作,您应该添加StreamData方法的第三个重载,它返回一个Task<IAsyncEnumerable<T>>(而不是TaskTask<T>)。此重载应返回一个专门与可处理资源相关联的异步序列,并在其枚举完成时处理此资源。以下是这种序列的实现:

public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly IAsyncDisposable _disposable;

    public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,
        IAsyncDisposable disposable)
    {
        // Arguments validation omitted
        _source = source;
        _disposable = disposable;
    }

    async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
        CancellationToken cancellationToken)
    {
        await using (_disposable.ConfigureAwait(false))
            await foreach (var item in _source
                .WithCancellation(cancellationToken)
                .ConfigureAwait(false)) yield return item;
    }
}

您可以在StreamData方法中这样使用它:
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return new AsyncEnumerableDisposable<T>(streamAction(stream), stream);
}

请注意,通常情况下,IAsyncEnumerable<T> 可以在其生命周期内被多次枚举,并通过将其包装到 AsyncEnumerableDisposable<T> 中,它基本上被减少为单个枚举序列(因为资源将在第一次枚举后被处理)。
替代方案: System.Interactive.Async 包中包含了 AsyncEnumerableEx.Using 运算符,可以用来代替自定义的 AsyncEnumerableDisposable 类。
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return AsyncEnumerableEx.Using(() => stream, streamAction);
}

区别在于Stream将通过其Dispose方法同步处理。据我所知,此包中没有支持处理IAsyncDisposable的方法。
这是AsyncEnumerableEx.Using方法的签名:
// Constructs an async-enumerable sequence that depends on a resource object, whose
// lifetime is tied to the resulting async-enumerable sequence's lifetime.
public static IAsyncEnumerable<TSource> Using<TSource, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory)
    where TResource : IDisposable;

1
感谢您提供了详细的解释和建议的解决方案 - 我倾向于认为这可能是最接近我想要实现的效果的方法。遗憾的是,这需要更多专门针对StreamData方法的覆盖。无论如何,我将接受这个作为答案。 - zidour
@zidour,我认为仅使用“ Task<T> StreamData<T>(Func<Stream, T> streamAction)”重载是不够的。这个方法过于通用,没有提供机制来指示正确的时间来处理“ Stream”对象。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接