使用取消标记的NetworkStream.ReadAsync从未取消

49

这里是证明。
有任何想法,这段代码有什么问题吗?

    [TestMethod]
    public void TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        bool ok = Read(tcp.GetStream()).Wait(30000);
        Assert.IsTrue(ok);
    }

    async Task Read(NetworkStream stream)
    {
        using (var cancellationTokenSource = new CancellationTokenSource(5000))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }

1
你能描述一下这段代码应该做什么以及它实际上为你做了什么吗?你是如何运行你的代码的?如果你直接从控制台应用程序运行它,会有什么变化吗? - svick
1
重申一下:测试到底是做什么的? - Stephen Cleary
1
我遇到了与 HttpClient.GetStreamAsync(...).CopyToAsync(...) 相同的问题,它使用了 ReadOnlyStream 覆盖了 WebExceptionWrapperStreamConnectStream,但仍然存在 base.ReadAsync() 没有将 cancellationToken 传递到 BeginEndReadAsync() 中的情况。 - SerG
6个回答

39

我最终找到了一个解决方法。使用Task.WaitAny将异步调用与延迟任务(Task.Delay)组合。当延迟超过IO任务时,关闭流。这将强制该任务停止。您应该正确处理io任务的异步异常。并且您应该为延迟任务和io任务添加连续任务。

它也适用于TCP连接。在另一个线程中关闭连接(可以认为是延迟任务线程)会强制所有使用/等待此连接的异步任务停止。

--编辑--

@vtortola提出的另一种更简洁的解决方案:使用取消令牌注册对stream.Close的调用:

async ValueTask Read(NetworkStream stream, TimeSpan timeout = default)
{
    if(timeout == default(TimeSpan))
      timeout = TimeSpan.FromSeconds(5);

    using var cts = new CancellationTokenSource(timeout); //C# 8 syntax
    using(cts.Token.Register(() => stream.Close()))
    {
       int receivedCount;
       try
       {
           var buffer = new byte[30000];
           receivedCount = await stream.ReadAsync(buffer, 0, 30000, tcs.Token).ConfigureAwait(false);
       }
       catch (TimeoutException)
       {
           receivedCount = -1;
       }
    }
}

12
我找到的另一种解决方案是在取消标记的“Register”方法中注册对“Close”的调用。因此,当取消标记被取消时,它会自动调用“Socket.Close”方法。 - vtortola
2
一个小提示:在你的例子中,你是在方法内部创建令牌。通常情况下,你会将令牌作为参数传递进来,然后需要将调用Register的代码放在一个"using"块中,以确保在离开方法后取消注册调用,否则每次都会注册调用 :) - vtortola
1
我最终做了类似的事情,但是我写了一条空消息(前缀和后缀,但没有有效载荷)到流中,而不是调用stream.Close()来避免ObjectDisposed异常。 - Sheldon Neilson
1
值得注意的是,NetworkStream.Dispose() 方法(由 Close() 调用)没有被官方认定为线程安全。尽管如此,Mono 实现似乎有尝试做到线程安全,而且目前看起来该实现和 MS 实现都仅限于可能会出现 ObjectDisposedException 异常(或者可能是空指针引用?)。还值得注意的是,你确实没有更好的选择。无论如何,这是我现在正在做的事情。:o - tekHedd
1
需要记住的一件事是,这将在超时时关闭底层连接。在某些情况下,保持连接开放可能是有意义的。 - ZakiMa
显示剩余4条评论

23

取消操作是协作的。若要能够取消NetworkStream.ReadAsync,它必须进行协作。但这很困难,因为这可能会让流处于未定义状态。Windows TCP堆栈中已读取哪些字节,哪些还没有读取?IO很难被取消。

反射器显示NetworkStream未重写ReadAsync。这意味着它将获得Stream.ReadAsync的默认行为,即忽略令牌。没有通用的方式可以取消流操作,所以BCL Stream类甚至不会尝试(也不能尝试 — 这是无法实现的)。

您应该在Socket上设置超时时间。


9
很遗憾,我认为网络操作是取消的情况之一,因为它们可能需要很长时间才能完成。 - svick
2
是的。我希望BCL能够支持所有常见的IO操作的CancelIO。这将与CancellationToken完美地集成在一起。 - usr
3
文档指出,在使用异步方法时,套接字(Socket)类的所有超时时间都会被忽略。"超时时间:该选项仅适用于同步接收调用。" 因此,所有 .net 的异步网络方法都不可用,因为它们可能导致无限锁定。 - Softlion
1
这是一个关于同样问题的讨论线程链接:https://dev59.com/v2TWa4cB1Zd3GeqPF7On,其中指向了这个线程http://social.msdn.microsoft.com/Forums/da-DK/async/thread/54632b19-0e9c-4078-aa59-c4389e75b187,解释了CancelIoEx函数允许取消单个操作,但是只在Vista中引入。但是.NET 4仍然支持XP-SP3,所以BCL不能轻易地使用该API。 - Softlion
1
@Softlion 的那个 API 点子非常好。异步套接字调用没有超时是不可思议的!我之前并不知道这一点。这意味着许多使用 Socket 的新异步 C# 应用程序将默认处于破损状态。 - usr
显示剩余3条评论

6
根据Softlion的回答描述:
使用Task.WaitAny将异步调用与延迟任务(Task.Delay)结合起来。当延迟在io任务之前过期时,关闭流。这将强制该任务停止。您应正确处理io任务上的异步异常。并且您应为延迟任务和io任务都添加一个继续任务。
我编写了一些代码,使您可以在超时情况下进行异步读取:
using System;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace ConsoleApplication2013
{
    class Program
    {
        /// <summary>
        /// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
        /// </summary>
        /// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
        /// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
        /// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
        /// <param name="buffer"> The buffer to write the data into</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
        /// <param name="amountToRead">The maximum number of bytes to read</param>
        /// <returns>
        /// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
        /// and Item2 is set to the amount of data that was read when Item1 is true
        /// </returns>
        public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
        {
            Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
            Task timeoutTask = Task.Delay(timeoutMillis);

            int amountRead = 0;

            bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
            {
                if (completedTask == timeoutTask) //the timeout task was the first to complete
                {
                    //close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
                    s.Close();
                    return false; //indicate that a timeout occurred
                }
                else //the readTask completed
                {
                    amountRead = readTask.Result;
                    return true;
                }
            });

            return new Tuple<bool, int>(result, amountRead);
        }

        #region sample usage
        static void Main(string[] args)
        {
            Program p = new Program();
            Task.WaitAll(p.RunAsync());
        }

        public async Task RunAsync()
        {
            Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);

            Console.WriteLine("Connecting...");
            s.Connect("127.0.0.1", 7894);  //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
            Console.WriteLine("Connected!");

            NetworkStream ns = new NetworkStream(s);

            byte[] buffer = new byte[1024];
            Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
            Console.WriteLine("Read task created");

            Tuple<bool, int> result = await readWithTimeoutTask;

            Console.WriteLine("readWithTimeoutTask is complete!");
            Console.WriteLine("Read succeeded without timeout? " + result.Item1 + ";  Amount read=" + result.Item2);
        }
        #endregion
    }
}

5

有几个问题需要解决:

  1. CancellationToken throws OperationCanceledException, not TimeoutException (cancellation is not always due to timeout).
  2. ReceiveTimeout doesn't apply, since you're doing an asynchronous read. Even if it did, you'd have a race condition between IOException and OperationCanceledException.
  3. Since you're synchronously connecting the socket, you'll want a high timeout on this test (IIRC, the default connection timeout is ~90 seconds, but can be changed as Windows monitors the network speeds).
  4. The correct way to test asynchronous code is with an asynchronous test:

    [TestMethod]
    public async Task TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        await Read(tcp.GetStream());
    }
    

我知道。它用于为真实类中的内部方法提供超时,我在此问题中替换为实际值。 - Softlion
  1. 在VS2012上,异步测试存在错误。它们不能正常工作,并且测试资源管理器无法识别它们(所有项目均为 .net 4.5)。
- Softlion
@Softlion:这是我第一次听到有关异步测试错误的问题。你是否已经向Microsoft Connect报告了此问题? - Stephen Cleary
1
如果你搜索这个问题,你会找到一些关于它的博客。有些人通过将测试项目的 .net 框架类型更改为 4.5 来解决它。我的已经是 4.5 了。我在 4.0 中使用了一个依赖项,也许这就是问题所在。而且我之前报告给 connect 的所有错误都被“忽略/未计划/按设计来处理/...”,所以我再也不会在 connect 上发布任何东西了。 - Softlion
更新:异步测试运行良好,但无法正确报告堆栈跟踪,并且在使用调试时,它们会停止在错误的位置或不显示代码。最糟糕的是,当抛出异常时,它会在没有正确调用堆栈的情况下中断。这已在VS2013 + Windows8.1中得到修复。需要Windows 8.1才能获取异步操作的调用堆栈。 - Softlion
显示剩余2条评论

2
提供有关三种不同方法的更多背景信息。我的服务监视其他Web应用程序的可用性。因此,它需要建立许多与各种网站的连接。其中一些会崩溃/返回错误/变得无响应。
Y轴 - 挂起测试(会话)的数量。由部署/重新启动引起的下降。
I.(1月25日)在改进服务后,最初的实现使用了带有取消标记的ReadAsync。这导致许多测试挂起(运行请求针对这些网站显示服务器确实有时没有返回内容)。
II.(2月17日)部署了一个更改,该更改使用Task.Delay保护取消。这完全解决了此问题。
private async Task<int> StreamReadWithCancellationTokenAsync(Stream stream, byte[] buffer, int count, Task cancellationDelayTask)
{
    if (cancellationDelayTask.IsCanceled)
    {
        throw new TaskCanceledException();
    }

    // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
    // operation is not guarded. As a result if remote server never responds and connection never closed
    // it will lead to this operation hanging forever.
    Task<int> readBytesTask = stream.ReadAsync(
        buffer,
        0,
        count);
    await Task.WhenAny(readBytesTask, cancellationDelayTask).ConfigureAwait(false);

    // Check whether cancellation task is cancelled (or completed).
    if (cancellationDelayTask.IsCanceled || cancellationDelayTask.IsCompleted)
    {
        throw new TaskCanceledException();
    }

    // Means that main task completed. We use Result directly.
    // If the main task failed the following line will throw an exception and
    // we'll catch it above.
    int readBytes = readBytesTask.Result;

    return readBytes;
}

三月三日,StackOverflow 开始根据超时时间关闭流。

using (timeoutToken.Register(() => stream.Close()))
{
    // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
    // operation is not guarded. As a result if a remote server never responds and connection never closed
    // it will lead to this operation hanging forever.
    // ReSharper disable once MethodSupportsCancellation
    readBytes = await targetStream.ReadAsync(
        buffer,
        0,
        Math.Min(responseBodyLimitInBytes - totalReadBytes, buffer.Length)).ConfigureAwait(false);
}

这种实现方式使程序再次出现了卡顿问题(虽然不像最初的方法那样严重):

enter image description here

"回到了使用 Task.Delay 方案。"

我认为Task.Delay的解决方案不好,因为读取任务会一直在后台运行/挂起。 - undefined
@maf-soft,除非有什么变化,这似乎仍然是唯一可靠的解决方案。其他讨论的解决方案可能仍然会出现问题。它们最终会完成(请参考其他解决方案的图表)。可能可以实现混合方案-通过Task.Delay进行停止并取消底层流。我猜这取决于性能要求。在上述服务中,这并不是那么关键。 - undefined
我在我的SerialPort案例中找到了一个似乎适合我的解决方案,并在这里分享了它: https://stackoverflow.com/a/77092594/1855801 - undefined
@maf-soft,谢谢你的分享!我觉得对于SerialPort来说是有道理的。至于HttpClient,我不太确定。我想将从网络流中读取的任务外包给HttpClient,而不是在主路径上引入Task.Delay。上述解决方案涉及到非正常路径(即极其罕见的情况)。 - undefined

1

提醒一下,await _stream.WriteAsync(message,cancellationToken);(_stream是SslStream)在执行BeginEndWriteAsync之前会在后台检查取消令牌是否已被取消,因此您必须在开始写入之前取消您的令牌。

public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        // If cancellation was requested, bail early with an already completed task.
        // Otherwise, return a task that represents the Begin/End methods.
        return cancellationToken.IsCancellationRequested
                    ? Task.FromCanceled(cancellationToken)
                    : BeginEndWriteAsync(buffer, offset, count);
    }

1
你说得对。我的错。不过,我认为我的回答仍然相关,因为问题更多地涉及取消标记。在涉及取消标记时,ReadAsync和WriteAsync的行为方式是相同的。 - Daniel Botero Correa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接