TcpListener:如何在等待AcceptTcpClientAsync()时停止监听?

23

我不知道如何在异步方法等待传入连接时正确关闭TcpListener。 我在SO上找到了这段代码,以下是代码:

public class Server
{
    private TcpListener _Server;
    private bool _Active;

    public Server()
    {
        _Server = new TcpListener(IPAddress.Any, 5555);
    }

    public async void StartListening()
    {
        _Active = true;
        _Server.Start();
        await AcceptConnections();
    }

    public void StopListening()
    {
        _Active = false;
        _Server.Stop();
    }

    private async Task AcceptConnections()
    {
        while (_Active)
        {
            var client = await _Server.AcceptTcpClientAsync();
            DoStuffWithClient(client);
        }
    }

    private void DoStuffWithClient(TcpClient client)
    {
        // ...
    }

}

主要内容:

    static void Main(string[] args)
    {
        var server = new Server();
        server.StartListening();

        Thread.Sleep(5000);

        server.StopListening();
        Console.Read();
    }

在这一行抛出了一个异常。

        await AcceptConnections();
当我调用Server.StopListening()时,对象会被删除。
所以我的问题是,如何取消AcceptTcpClientAsync()以正确关闭TcpListener。

1
在SO上找到了答案:[https://dev59.com/y2Uq5IYBdhLWcg3waPpC][1]谢谢 - Bastiflew
为什么不使用 try {} 来捕获异常? - Martin Meeser
8个回答

11

由于这里没有适当的工作示例,因此提供一个:

假设您在范围内拥有cancellationTokentcpListener,则可以执行以下操作:

using (cancellationToken.Register(() => tcpListener.Stop()))
{
    try
    {
        var tcpClient = await tcpListener.AcceptTcpClientAsync();
        // … carry on …
    }
    catch (InvalidOperationException)
    {
        // Either tcpListener.Start wasn't called (a bug!)
        // or the CancellationToken was cancelled before
        // we started accepting (giving an InvalidOperationException),
        // or the CancellationToken was cancelled after
        // we started accepting (giving an ObjectDisposedException).
        //
        // In the latter two cases we should surface the cancellation
        // exception, or otherwise rethrow the original exception.
        cancellationToken.ThrowIfCancellationRequested();
        throw;
    }
}

我在tcpListener.AcceptTcpClientAsync()处发生System.ObjectDisposedException异常,因为它在一个while循环中。所以这并不是一个好的解决方案。 - Nicolas VERHELST
这看起来像是最好的解决方案。不要在阻塞方法上处理OperationCanceledException异常,而是从阻塞的AcceptTcpClientAsync方法中处理ObjectDisposedException异常。每次取消时都必须创建一个新的TcpListener。 - C.M.

6

虽然有一个基于Stephen Toub的博客文章的相对复杂的解决方案,但是使用内置的.NET API有更简单的解决方案:

var cancellation = new CancellationTokenSource();
await Task.Run(() => listener.AcceptTcpClientAsync(), cancellation.Token);

// somewhere in another thread
cancellation.Cancel();

这个解决方案不会终止挂起的接受调用。但其他解决方案也没有做到这一点,而且这个解决方案至少更短。
更新:一个更完整的示例,展示了在取消信号后应该发生的情况:
var cancellation = new CancellationTokenSource();
var listener = new TcpListener(IPAddress.Any, 5555);
listener.Start();
try
{
    while (true)
    {
        var client = await Task.Run(
            () => listener.AcceptTcpClientAsync(),
            cancellation.Token);
        // use the client, pass CancellationToken to other blocking methods too
    }
}
finally
{
    listener.Stop();
}

// somewhere in another thread
cancellation.Cancel();
更新2:Task.Run只在任务开始时检查取消标记。为了加快接受循环的终止速度,您可能希望注册取消操作:
cancellation.Token.Register(() => listener.Stop());

7
这会导致套接字泄漏,异步调用并使端口永久被占用。 - usr
2
是的,它是无处不在的,但你仍然必须调用Stop。这不是二选一的问题。要么停止,要么停止并处理CT。始终停止。即使使用CT,这也会导致异常,必须加以处理。 - usr
1
@usr 实际上,CancellationToken不会引发任何套接字异常,至少在接受循环中不会。循环将以TaskCanceledException退出,这比在另一个线程异步调用TcpListener.Stop()时抛出的未记录(可能含糊不清)的异常更容易进行编程。此外,问题在于几乎总是有更多需要清理的内容,而不仅仅是TcpListener(考虑已经打开的连接)。 - Robert Važan
1
我认为你在回答中已经“提到”了这一点,但仍然可能会误导人们 - 你的回答并不起作用,因为一旦监听器启动并开始监听,就无法使用取消标记来取消此任务。如果你已经知道这一点,我不明白为什么你还要提出来并让人困惑。-1。 - KFL
1
@Salgat确实,这将一直阻止直到下一次连接尝试,在那时取消将被观察到。可能会想要在那里注册取消操作并关闭侦听器。 - Robert Važan
显示剩余6条评论

4

对我来说很有用: 创建一个本地虚拟客户端连接到监听器,当连接被接受后,只需不再执行另一个异步接受操作(使用活动标志)。

// This is so the accept callback knows to not 
_Active = false;

TcpClient dummyClient = new TcpClient();
dummyClient.Connect(m_listener.LocalEndpoint as IPEndPoint);
dummyClient.Close();

这可能是一种黑客方式,但它似乎比其他选项更美观 :)

这是一个有趣的黑客技巧。如果侦听器没有在回环接口上监听,则无法工作。 - usr
对我来说已经足够好(而且很简单),只需要在Connect()之后执行Application.DoEvents(),以便在某些情况下放手终止AcceptTcpClientAsync()。 - Fabien

3

调用 StopListening (会释放套接字)是正确的。只需忽略该特定错误。您无法避免这种情况,因为您需要以某种方式停止挂起的呼叫。如果不这样做,套接字和挂起的异步IO都将泄漏,且端口仍在使用中。


2

定义这个扩展方法:

public static class Extensions
{
    public static async Task<TcpClient> AcceptTcpClientAsync(this TcpListener listener, CancellationToken token)
    {
        try
        {
            return await listener.AcceptTcpClientAsync();
        }
        catch (Exception ex) when (token.IsCancellationRequested) 
        { 
            throw new OperationCanceledException("Cancellation was requested while awaiting TCP client connection.", ex);
        }
    }
}

在使用扩展方法接受客户端连接之前,请执行以下操作:

token.Register(() => listener.Stop());

0

当我持续监听新连接客户端时,我使用了以下解决方案:

public async Task ListenAsync(IPEndPoint endPoint, CancellationToken cancellationToken)
{
    TcpListener listener = new TcpListener(endPoint);
    listener.Start();

    // Stop() typically makes AcceptSocketAsync() throw an ObjectDisposedException.
    cancellationToken.Register(() => listener.Stop());

    // Continually listen for new clients connecting.
    try
    {
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
            Socket clientSocket = await listener.AcceptSocketAsync();
        }
    }
    catch (OperationCanceledException) { throw; }
    catch (Exception) { cancellationToken.ThrowIfCancellationRequested(); }
}
  • 当被取消时,我注册回调以调用实例上的Stop()
  • AcceptSocketAsync通常会立即抛出一个ObjectDisposedException异常。
  • 我捕获任何OperationCanceledException之外的Exception,以抛出一个"合理的"OperationCanceledException给外部调用者。

我对async编程相当陌生,所以如果这种方法有问题,请原谅 - 我很乐意看到它被指出来以便学习!


0

取消令牌有一个委托,您可以使用它来停止服务器。当服务器停止时,任何正在侦听的连接调用都将抛出套接字异常。

请参见以下代码:

public class TcpListenerWrapper
{
    // helper class would not be necessary if base.Active was public, c'mon Microsoft...
    private class TcpListenerActive : TcpListener, IDisposable
    {
        public TcpListenerActive(IPEndPoint localEP) : base(localEP) {}
        public TcpListenerActive(IPAddress localaddr, int port) : base(localaddr, port) {}
        public void Dispose() { Stop(); }
        public new bool Active => base.Active;
    }

    private TcpListenerActive server

    public async Task StartAsync(int port, CancellationToken token)
    {
        if (server != null)
        {
            server.Stop();
        }

        server = new TcpListenerActive(IPAddress.Any, port);
        server.Start(maxConnectionCount);
        token.Register(() => server.Stop());
        while (server.Active)
        {
            try
            {
                await ProcessConnection();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }
    }

    private async Task ProcessConnection()
    {
        using (TcpClient client = await server.AcceptTcpClientAsync())
        {
            // handle connection
        }
    }
}

0

https://learn.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socket.beginaccept?view=net-5.0

要取消对 BeginAccept 方法的挂起调用,请关闭 Socket。当在异步操作正在进行时调用 Close 方法时,将调用提供给 BeginAccept 方法的回调。后续对 EndAccept 方法的调用将引发 ObjectDisposedException,以指示操作已被取消。

这里是 TcpListner.cs 的反编译代码。

    [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true)]
    public Task<TcpClient> AcceptTcpClientAsync()
    {
        return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
    }

    /// <summary>Asynchronously accepts an incoming connection attempt and creates a new <see cref="T:System.Net.Sockets.TcpClient" /> to handle remote host communication.</summary>
    /// <returns>A <see cref="T:System.Net.Sockets.TcpClient" />.</returns>
    /// <param name="asyncResult">An <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.Net.Sockets.TcpListener.BeginAcceptTcpClient(System.AsyncCallback,System.Object)" /> method.</param>
    /// <PermissionSet>
    ///   <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
    ///   <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    /// </PermissionSet>
    public TcpClient EndAcceptTcpClient(IAsyncResult asyncResult)
    {
        if (Logging.On)
        {
            Logging.Enter(Logging.Sockets, this, "EndAcceptTcpClient", null);
        }
        if (asyncResult == null)
        {
            throw new ArgumentNullException("asyncResult");
        }
        LazyAsyncResult lazyResult = asyncResult as LazyAsyncResult;
        Socket asyncSocket = (lazyResult == null) ? null : (lazyResult.AsyncObject as Socket);
        if (asyncSocket == null)
        {
            throw new ArgumentException(SR.GetString("net_io_invalidasyncresult"), "asyncResult");
        }
        Socket socket = asyncSocket.EndAccept(asyncResult);
        if (Logging.On)
        {
            Logging.Exit(Logging.Sockets, this, "EndAcceptTcpClient", socket);
        }
        return new TcpClient(socket);
    }

    /// <summary>Begins an asynchronous operation to accept an incoming connection attempt.</summary>
    /// <returns>An <see cref="T:System.IAsyncResult" /> that references the asynchronous creation of the <see cref="T:System.Net.Sockets.TcpClient" />.</returns>
    /// <param name="callback">An <see cref="T:System.AsyncCallback" /> delegate that references the method to invoke when the operation is complete.</param>
    /// <param name="state">A user-defined object containing information about the accept operation. This object is passed to the <paramref name="callback" /> delegate when the operation is complete.</param>
    /// <exception cref="T:System.Net.Sockets.SocketException">An error occurred while attempting to access the socket. See the Remarks section for more information. </exception>
    /// <exception cref="T:System.ObjectDisposedException">The <see cref="T:System.Net.Sockets.Socket" /> has been closed. </exception>
    /// <PermissionSet>
    ///   <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
    ///   <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    /// </PermissionSet>
    [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true)]
    public IAsyncResult BeginAcceptTcpClient(AsyncCallback callback, object state)
    {
        if (Logging.On)
        {
            Logging.Enter(Logging.Sockets, this, "BeginAcceptTcpClient", null);
        }
        if (!m_Active)
        {
            throw new InvalidOperationException(SR.GetString("net_stopped"));
        }
        IAsyncResult result = m_ServerSocket.BeginAccept(callback, state);
        if (Logging.On)
        {
            Logging.Exit(Logging.Sockets, this, "BeginAcceptTcpClient", null);
        }
        return result;
    }

并反编译Socket.cs文件。

    /// <summary>Asynchronously accepts an incoming connection attempt and creates a new <see cref="T:System.Net.Sockets.Socket" /> to handle remote host communication.</summary>
    /// <returns>A <see cref="T:System.Net.Sockets.Socket" /> to handle communication with the remote host.</returns>
    /// <param name="asyncResult">An <see cref="T:System.IAsyncResult" /> that stores state information for this asynchronous operation as well as any user defined data. </param>
    /// <exception cref="T:System.ArgumentNullException">
    ///   <paramref name="asyncResult" /> is null. </exception>
    /// <exception cref="T:System.ArgumentException">
    ///   <paramref name="asyncResult" /> was not created by a call to <see cref="M:System.Net.Sockets.Socket.BeginAccept(System.AsyncCallback,System.Object)" />. </exception>
    /// <exception cref="T:System.Net.Sockets.SocketException">An error occurred when attempting to access the socket. See the Remarks section for more information. </exception>
    /// <exception cref="T:System.ObjectDisposedException">The <see cref="T:System.Net.Sockets.Socket" /> has been closed. </exception>
    /// <exception cref="T:System.InvalidOperationException">
    ///   <see cref="M:System.Net.Sockets.Socket.EndAccept(System.IAsyncResult)" /> method was previously called. </exception>
    /// <exception cref="T:System.NotSupportedException">Windows NT is required for this method. </exception>
    /// <PermissionSet>
    ///   <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    ///   <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
    ///   <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
    /// </PermissionSet>
    public Socket EndAccept(IAsyncResult asyncResult)
    {
        if (s_LoggingEnabled)
        {
            Logging.Enter(Logging.Sockets, this, "EndAccept", asyncResult);
        }
        if (CleanedUp)
        {
            throw new ObjectDisposedException(GetType().FullName);
        }
        byte[] buffer;
        int bytesTransferred;
        if (asyncResult != null && asyncResult is AcceptOverlappedAsyncResult)
        {
            return EndAccept(out buffer, out bytesTransferred, asyncResult);
        }
        if (asyncResult == null)
        {
            throw new ArgumentNullException("asyncResult");
        }
        AcceptAsyncResult castedAsyncResult = asyncResult as AcceptAsyncResult;
        if (castedAsyncResult == null || castedAsyncResult.AsyncObject != this)
        {
            throw new ArgumentException(SR.GetString("net_io_invalidasyncresult"), "asyncResult");
        }
        if (castedAsyncResult.EndCalled)
        {
            throw new InvalidOperationException(SR.GetString("net_io_invalidendcall", "EndAccept"));
        }
        object result = castedAsyncResult.InternalWaitForCompletion();
        castedAsyncResult.EndCalled = true;
        Exception exception = result as Exception;
        if (exception != null)
        {
            throw exception;
        }
        if (castedAsyncResult.ErrorCode != 0)
        {
            SocketException socketException = new SocketException(castedAsyncResult.ErrorCode);
            UpdateStatusAfterSocketError(socketException);
            if (s_LoggingEnabled)
            {
                Logging.Exception(Logging.Sockets, this, "EndAccept", socketException);
            }
            throw socketException;
        }
        Socket acceptedSocket = (Socket)result;
        if (s_LoggingEnabled)
        {
            Logging.PrintInfo(Logging.Sockets, acceptedSocket, SR.GetString("net_log_socket_accepted", acceptedSocket.RemoteEndPoint, acceptedSocket.LocalEndPoint));
            Logging.Exit(Logging.Sockets, this, "EndAccept", result);
        }
        return acceptedSocket;
    }

看起来 AcceptTcpClientAsync() 内部使用了类似于 BeginAccept() 和 EndAccept() 的东西。在 Socket.cs 中,如果 CleanedUp 为 true,则会抛出 ObjectDisposedException,这意味着监听套接字已关闭。 因此,关闭监听套接字会导致 AcceptTcpClientAsync() 抛出 ObjectDisposedException。

namespace TestTcpListenStop {
    class Program {
        static TcpListener listner;

        static void Main(string[] args) {
            for (int i = 0; i < 100; ++i) {
                StartStopTest();
            }

            Console.ReadKey();
            return;
        }

        static void StartStopTest() {
            // start listner
            listner = new TcpListener(IPAddress.Any, 17000);
            listner.Start();

            // start accept
            Task tk = AcceptAsync();

            // do other things
            Task.Delay(1).Wait();

            // close listen socket
            listner.Stop();
            tk.Wait();
        
            return;
        }

        static async Task AcceptAsync() {
            Console.WriteLine("Accepting client...");

            TcpClient client;
            while (true) {
                try {
                    // Closing listen socket causes
                    // AcceptTcpClientAsync() throw ObjectDisposedException
                    client = await listner.AcceptTcpClientAsync().ConfigureAwait(false);
                    Console.WriteLine("A client has been accepted.");
                }
                catch (ObjectDisposedException) {
                    Console.WriteLine("This exception means listening socket closed.");
                    break;
                }

                // we just close.
                client.Client.Shutdown(SocketShutdown.Both);
                client.Close();
            }

            Console.WriteLine("AcceptAsync() terminated.");
        }
    }
}

https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.wait?view=net-5.0

取消 cancellationToken 取消令牌对正在运行的任务没有影响,除非它也被传递了取消令牌并准备好处理取消。将 cancellationToken 对象传递给此方法只是允许取消等待。

我认为使用取消令牌实际上并不能停止 AcceptTcpClientAsync()。我们只是取消等待,而不是 AcceptTcpClientAsync(),因为 AcceptTcpClientAsync() 没有接收到取消令牌作为参数。只有关闭监听套接字才能取消 AcceptTcpClientAsync()。请参见以下来自 msdn 的内容。

public class Example {
    public static void Main() {
        CancellationTokenSource ts = new CancellationTokenSource();

        Task t = Task.Run(() => {
            Console.WriteLine("Calling Cancel...");
            ts.Cancel();
            Task.Delay(5000).Wait();
            Console.WriteLine("Task ended delay...");
        });
        try {
            Console.WriteLine("About to wait for the task to complete...");
            t.Wait(ts.Token);
        }
        catch (OperationCanceledException e) {
            Console.WriteLine("{0}: The wait has been canceled. Task status: {1:G}",
                                e.GetType().Name, t.Status);
            Thread.Sleep(6000);
            Console.WriteLine("After sleeping, the task status:  {0:G}", t.Status);
        }
        ts.Dispose();
    }
}
// The example displays output like the following:
//    About to wait for the task to complete...
//    Calling Cancel...
//    OperationCanceledException: The wait has been canceled. Task status: Running
//    Task ended delay...
//    After sleeping, the task status:  RanToCompletion

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接