SslStream.ReadAsync从不调用InnerStream.ReadAsync。

3
我不确定是我漏掉了什么还是SslStream(以及可能包装内部流的其他流类)确实存在设计缺陷。

考虑以下伪代码:

MyStream
{
  Task<int> ReadAsync()
  { ... }
}

...

{
  MyStream my = new MyStream();
  SslStream ssl = new SslStream(my);
  ssl.ReadAsync(...);
}

尽管人们可能期望SslStream.ReadAsync最终会调用MyStream.ReadAsync,但实际上它不会这样做。相反,它会调用MyStream.BeginRead(如果定义了)。如果未定义MyStream.BeginRead,则行为将难以预测(这将取决于从哪个类派生MyStream等)。
简而言之,要使SslStream的异步/等待方法按预期工作,需要实现内部流类的BeginXXX / EndXXX(非异步/等待方法)。
与异步/等待模式相比,BeginXXX / EndXXX模式的开发要复杂得多(对我来说,这是引入异步/等待的原因-使异步编程更容易)。但是,仍需开发BeginXXX / EndXXX方法的要求破坏了异步/等待的目的。
此外,还需要知道SslStream类的内部实现(因为如果实现方式不同,它可能直接调用InnerStream.ReadAsync)。我的意思是,SslStream的公共签名并没有清楚地为我提供足够的信息,告诉我是否应在内部流类中实现ReadAsync或BeginRead。
为此,我需要使用试错方法或检查SslStream的源代码(以及其Stream父项,因为SslStream从基本Stream类继承ReadAsync)。这似乎不是编写代码的可靠和直接的方法。
SslStream / Stream类中的异步/等待方法(例如ReadAsync)的当前实现是否有原因?
3个回答

6

是的,Stream 特别混乱,因为它已经存在很长时间了,早于 async/await。例如,ReadAsync 的默认实现实际上会在线程池线程上进行阻塞读取。

我建议您将 ReadAsync 覆盖为常规的 TAP (基于任务的异步模式) 方法,并且将 BeginRead/EndRead 包装成该 TAP 方法的 APM (异步编程模型) 封套。 MSDN 文档 对此有最好的模式(正确地处理 callbackstate),只是我更喜欢微调一下,以便不将 EndRead 中的任何异常包装在 AggregateException 中:

public static IAsyncResult ToBegin<T>(
    Task<T> task, AsyncCallback callback, object state)
{
  var tcs = new TaskCompletionSource<T>(state);
  task.ContinueWith(t =>
  {
    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)
    else if (t.IsCanceled) tcs.TrySetCanceled();
    else tcs.TrySetResult(t.Result);

    if (callback != null) callback(tcs.Task);
  }, TaskScheduler.Default);
  return tcs.Task;
}

public static T ToEnd<T>(IAsyncResult result)
{
  // Original MSDN code uses Task<T>.Result
  return ((Task<T>)result).GetAwaiter().GetResult();
}

不提供 CancellationToken.None 给 continuation 安全吗?我总是对这样的东西有点模糊... 如果 task 有一个 CancellationToken,取消它会阻止 continuation,对吧?这可能会引起一些问题(微小?)。哦,我喜欢使用 GetAwaiter 的技巧:D 它是否正确地保留了原始调用堆栈? - Luaan
在这种情况下,连续运行不考虑任务的结束状态。TCS任务最终将具有不同的“CancellationToken”与“ReadAsync”任务,但通常并不重要。是的,“GetAwaiter().GetResult()”确实保留了原始调用堆栈-这基本上就是“async”状态机正在做的事情。有点丑陋,但它有效。 :) - Stephen Cleary
哦,那很丑陋 - 但对于框架非常有用。您认为在通常需要在抛出之前包装未包装异常的其他情况下使用此方法是否有意义?或者这太过hacky了?:D - Luaan
我发现自己现在到处都在使用它。控制台主要方法等。 - Stephen Cleary

3

Task 实现了 IAsyncResult 接口,因此您可以使用

return ReadAsync(...);

作为实现BeginRead的一部分。也许它有点复杂,例如您需要将回调连接作为该任务的继续进行。但您可以重用代码。
顺便说一句,在BeginReadReadAsync执行不同操作时,您违反了Stream API契约。严格来说,这是您的错,而不是框架的问题。

我的意思是,SslStream的公共签名并没有清楚地为我提供足够的信息,告诉我是否应该在我的内部流类中实现ReadAsync或BeginRead。

无需知道。当您从一个类继承时,您继承了该类的所有义务。您必须实现所有内容,因为这是您向该类的用户承诺的。显而易见的例子:如果您从IEnumerator派生,您不能只实现MoveNext而省略Current并期望调用者正常工作。

当你从一个类继承时,你会继承这个类所有的义务。我明白我必须这样做。我知道如何解决我的问题(包括实现所有方法,包括BeginXXX方法)。 我只是认为async/await被添加是为了摆脱处理BeginXXX,但它并没有起到帮助作用。这就是我所考虑的设计缺陷。 需要我实现一些已经被新功能取代的东西的设计缺陷。但是你说我可以重用我的async代码,返回Task作为IAsyncResult是有帮助的!谢谢。 - Alex
没错。通常最好的目标是尽量减少复杂性,而异步操作会增加复杂性。因此,您可以考虑采用同步方式,这样许多问题就会消失。 - usr
@Alex 但是每次使用API时,它确实可以帮助您解决这个问题。唯一无法避免旧异步模式的是实现新流(例如)- 这仍意味着您将“旧的、复杂”的内容集中在一个小方法中,而其他所有内容都很好地使用await。默认的ReadAsync调用BeginRead,因此您可以轻松地在编写Task/await支持之前的流上使用await。只是因为其他异步流的内部实现仍在使用它,所以没有办法摆脱BeginRead - Luaan
仍然有麻烦。在我的BeginRead中调用我的ReadAsync(其中我的ReadAsync内部调用base.ReadAsync)显然会导致堆栈溢出,因为在base.ReadAsync内部调用了我的BeginRead。似乎唯一的方法是提供XXXAsync和BeginXXX方法的独立实现。 - Alex
1
@Alex,你需要一个实现,但需要覆盖所有方法。 - usr
@usr 再次感谢您的帮助。实际上我一直在覆盖它们,但是我犯了一个愚蠢的错误:在 ReadAsync 中,我调用了 BeginRead(递归调用了 ReadAsync),而不是 base.BeginRead。现在它正常工作了。 - Alex

2

ReadAsync只是深层异步API的辅助包装器。

如果要覆盖异步行为,最好的方法是覆盖BeginReadEndRead,因为ReadAsync的默认实现将在它们之上构建任务(并且其他BeginRead会调用基础流的BeginRead)。

这仅仅是因为ReadAsync后来才被添加的 - 实际上,在某些平台上,它们还不可用。但最终,期望ReadAsync作为重载工作与期望它自动使用你已经重载的Read非常相似 - 它们并不真正适配。

但不要担心 - 这并不会使使用基于Task的API更加困难! Task本身实现了IAsyncResult。因此,在重写BeginRead方法时,只需让它返回ReadAsync :)

简化的样例实现:

class MyStream : Stream
{
    public override async Task<int> ReadAsync
      (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        const string str = "Hi there!\r\n";

        await Task.Delay(1000);

        return Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, offset);
    }

    public override IAsyncResult BeginRead
      (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
    {
        return ReadAsync(buffer, offset, count).ContinueWith(t => callback(t));
    }

    public override int EndRead(IAsyncResult asyncResult)
    {
        // Stolen from Stephen. Nicer than rethrowing the inner exception manually :)
        return ((Task<int>)asyncResult).GetAwaiter().GetResult();
    }
}

显然,将其转换为基类非常容易 - 允许您仅在子类中重写 ReadAsync 方法。如果这不是一个选项,您可以在扩展方法中管理功能(在简化的代码中看起来并不重要,但您可能想在实际代码中进行一些检查和错误处理;更重要的是,您确实希望处理状态 - 对于我的测试案例,实现它是不必要的,但这只是违反 API 的另一件事)。
正如 @usr 指出的那样,你错过的点是,让你的流行为保持一致是你的责任。这就像你只重载了 == 运算符而没有重载 EqualsGetHashCode - 产生的不一致是你的错,因为你应该确保一致性。另一方面,如果你只重载了 BeginRead,它也能正常工作,因为 ReadAsync 的默认实现调用 BeginRead - 代码必须保持向后兼容。但出于完全相同的原因,它不能以相同的方式工作。
编辑:好吧,我已经写了一些代码,即使传递了一个 state,它也应该能够正常工作(现在已更新为两个 TaskTask<T>)。
static class Extensions
{
    struct Unit { }

    public static IAsyncResult Apmize<T>(this Task<T> @this, AsyncCallback callback, object state)
    {
        return @this.ApmizeInternal<T>(callback, state);
    }

    public static IAsyncResult Apmize(this Task @this, AsyncCallback callback, object state)
    {
        return @this.ApmizeInternal<Unit>(callback, state);
    }

    private static IAsyncResult ApmizeInternal<T>(this Task @this, AsyncCallback callback, object state)
    {
        if (@this.AsyncState == state)
            return @this.ContinueWith(t => callback(t));

        var tcs = new TaskCompletionSource<T>(state);

        @this.ContinueWith
            (
                t =>
                {
                    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
                    else if (t.IsCanceled) tcs.TrySetCanceled();
                    else
                    {
                        if (t is Task<T>) 
                        {
                            tcs.TrySetResult(((Task<T>)t).Result);
                        }
                        else
                        {
                            tcs.TrySetResult(default(T));
                        }
                    }

                    if (callback != null) callback(tcs.Task);
                },
                CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default
            );

        return tcs.Task;
    }
}

使用方法如下:

public override IAsyncResult BeginRead
  (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
    return ReadAsync(buffer, offset, count).Apmize(callback, state);
}

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
    return WriteAsync(buffer, offset, count).Apmize(callback, state);
}

这可能会比较长,但只需使用一个可重复使用的扩展方法即可。并且它可以正确处理状态 :)


1
@Alex,你基本上有两个选择 - 要么创建另一个适用于“Task”的版本,要么使用类似于Task<object>或更好的Task<Unit>(其中Unit是一种“空”类型;无需公开它,始终只返回TaskIAsyncResult)。你无论如何都需要TaskCompletionSource,因此可以创建一个简单的方法重载,什么也不做,只调用Task<Unit>重载。 - Luaan
@Alex 当然不行,我并不是指你应该那样做。你需要在另一个任务中包装内部任务。但实际上,你并不真正“需要”那样做。因为你只需要暴露IAsyncResult - TaskTask<T> 都实现了它。为了保持代码DRY(Don't Repeat Yourself),你可以使帮助方法更加灵活 - 让我添加一个示例... - Luaan
很不幸,它无法编译。我无法理解它为什么能工作,因为Apmize<T>需要输入Task<T>,而不是Task。在我看来,“@this.ApmizeInternal<Unit>(callback, state)”意味着Unit将被传递给ApmizeInternal,但该方法本身仍必须针对Task<T>类型的实例进行调用,而不是Task。这就是编译器抱怨的原因。 - Alex
1
@Alex 很好的问题。如果调用方不关心异步回调,我想你可以完全放弃 ContinueWith - 只需执行类似于 return callback == null ? @this : @this.ContinueWith(t => callback(t)); 的操作即可。这并不一定意味着您会失去异步支持 - 您仍然可以保持检查 IAsyncResult.IsCompleted 或只使用 IAsyncResult.AsyncWaitHandle 来注册自己的回调函数,例如。 - Luaan
1
@Alex 嗯,很久没见了……但我不这么认为。那只是针对state不匹配的情况 - 我们基本上必须解开实际进行的任务并调用回调函数。如果没有状态和回调函数,您可以直接使用输入任务作为IAsyncResult - Luaan
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接