如何优雅地关闭在NamedPipeServer#WaitForConnection上被阻塞的线程?

46

我启动应用程序,它生成许多线程,每个线程创建一个命名管道服务器(.Net 3.5添加了命名管道IPC的托管类型),并等待客户端连接(阻塞)。该代码按预期运行。

private void StartNamedPipeServer()
  {
    using (NamedPipeServerStream pipeStream =
                    new NamedPipeServerStream(m_sPipeName, PipeDirection.InOut, m_iMaxInstancesToCreate, PipeTransmissionMode.Message, PipeOptions.None))
    {
      m_pipeServers.Add(pipeStream);
      while (!m_bShutdownRequested)
      {
        pipeStream.WaitForConnection();
        Console.WriteLine("Client connection received by {0}", Thread.CurrentThread.Name);
        ....  

现在我还需要一个Shutdown方法来清理这个进程。我尝试了通常的bool标志isShutdownRequested技巧。但是pipestream仍然在WaitForConnection()调用上阻塞,线程不会死亡。

public void Stop()
{
   m_bShutdownRequested = true;
   for (int i = 0; i < m_iMaxInstancesToCreate; i++)
   {
     Thread t = m_serverThreads[i];
     NamedPipeServerStream pipeStream = m_pipeServers[i];
     if (pipeStream != null)
     {
       if (pipeStream.IsConnected)
          pipeStream.Disconnect();
       pipeStream.Close();
       pipeStream.Dispose();
     }

     Console.Write("Shutting down {0} ...", t.Name);
     t.Join();
     Console.WriteLine(" done!");
   }
} 

Join方法不会返回。

一个我没有尝试但可能可行的选项是调用Thread.Abort并消耗异常。 但这感觉不对... 有什么建议吗?

更新2009-12-22
很抱歉没有早些发布此内容... 这是我从Kim Hamilton(BCL团队)收到的回复:

实现可中断的WaitForConnection的“正确”方法是调用BeginWaitForConnection,在回调中处理新连接,并关闭管道流以停止等待连接。 如果管道已关闭,则EndWaitForConnection将引发ObjectDisposedException,回调线程可以捕获它,清理任何松散的端口,并干净地退出。

我们意识到这必须是一个常见的问题,因此我的团队中的某个人计划很快发布博客文章。

7个回答

49

这很俗套,但这是我找到的唯一可行的方法。 创建一个“假”客户端并连接到您的命名管道,以绕过WaitForConnection。 每次都有效。

另外,即使Thread.Abort()也无法解决我的问题。


_pipeserver.Dispose();
_pipeserver = null;

using (NamedPipeClientStream npcs = new NamedPipeClientStream("pipename")) 
{
    npcs.Connect(100);
}

非常好,可以在不重写同步工作代码的情况下正常运行;-)) 谢谢 - Svisstack
这是一个非常简单的解决方案... 对我来说完美地运行。 - JayGee
奶酪,检查。工作,检查。在使用中应该对连接调用进行错误捕获。 - DAG

18

切换到异步版本:BeginWaitForConnection

如果它在某个时候完成,您需要一个标志,以便完成处理程序只需调用EndWaitForConnection吸收任何异常并退出(调用End...以确保可以清理任何资源)。


1
如果你想使用同步样式来使用它,调用Begin函数后,可以使用WaitHandle.WaitAny在IAsyncResult.AsyncWaitHandle以及你自己的“取消”ManualResetEvent上等待。然后在关闭时设置你的取消事件,你的线程将会被解除阻塞。 - donovan

8
你可以使用以下扩展方法。注意包含“ManualResetEvent cancelEvent” - 你可以从另一个线程设置此事件来发出信号,以指示等待连接方法现在应该中止并关闭管道。在设置m_bShutdownRequested时包括cancelEvent.Set(),则关闭应该相对优雅。
    public static void WaitForConnectionEx(this NamedPipeServerStream stream, ManualResetEvent cancelEvent)
    {
        Exception e = null;
        AutoResetEvent connectEvent = new AutoResetEvent(false);
        stream.BeginWaitForConnection(ar =>
        {
            try
            {
                stream.EndWaitForConnection(ar);
            }
            catch (Exception er)
            {
                e = er;
            }
            connectEvent.Set();
        }, null);
        if (WaitHandle.WaitAny(new WaitHandle[] { connectEvent, cancelEvent }) == 1)
            stream.Close();
        if (e != null)
            throw e; // rethrow exception
    }

1
你可以通过调用BeginWaitForConnection而不使用回调(使用null代替),然后在返回的结果的AsyncWaitHandle和cancelEvent上使用WaitHandle.WaitAny来简化此过程。 - Tony Edgecombe

1
一个最简单易行的解决方案是创建一个虚拟客户端并与服务器建立连接。
NamedPipeServerStream pServer;
bool exit_flg=false;
    public void PipeServerWaiter()
{

    NamedPipeServerStream  pipeServer = new NamedPipeServerStream("DphPipe", PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances);
    pServer = pipeServer;
    pipeServer.WaitForConnection();


    if (exit_flg) return;
    thread = new Thread(PipeServerWaiter);
    thread.Start();

}
public void Dispose()
{
    try
    {
        exit_flg = true;
        NamedPipeClientStream clt = new NamedPipeClientStream(".", "DphPipe");
        clt.Connect();
        clt.Close();

        pServer.Close();
        pServer.Dispose();


    }

1
我编写了这个扩展方法来解决这个问题:
public static void WaitForConnectionEx(this NamedPipeServerStream stream)
{
    var evt = new AutoResetEvent(false);
    Exception e = null;
    stream.BeginWaitForConnection(ar => 
    {
        try
        {
            stream.EndWaitForConnection(ar);
        }
        catch (Exception er)
        {
            e = er;
        }
        evt.Set();
    }, null);
    evt.WaitOne();
    if (e != null)
        throw e; // rethrow exception
}

1
你的方法旨在实现什么?目前,它只是使用异步对应项重新实现同步的“WaitForConnection”。 - Mark
如果您调用stream.Dispose(),当WaitForConnection阻塞某个线程时 - 不会发生任何事情,该线程将被阻塞。 如果在某个线程执行我的扩展方法时调用stream.Dispose() - WaitForConnectionEx将抛出异常。 - Evgeny Lazin

0

一种可行的方法是在 WaitForConnection 后立即检查 m_bShutdownRequested。

在关闭过程中设置布尔值。之后向所有现有管道发送虚拟消息,以便它们打开连接并检查布尔值,从而实现干净地关闭。


-1
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Windows;
using System.Windows.Controls;

namespace PIPESERVER
{
    public partial class PWIN : UserControl
   {
    public string msg = "", cmd = "", text = "";
    public NamedPipeServerStream pipe;
    public NamedPipeClientStream dummyclient;
    public string PipeName = "PIPE1";
    public static string status = "";
    private static int numThreads = 2;
    int threadId;
    int i;
    string[] word;
    char[] buffer;
    public StreamString ss;

    public bool ConnectDummyClient()
    {
        new Thread(() =>
        {
            dummyclient = new NamedPipeClientStream(".", "PIPE1");
            try
            {
                dummyclient.Connect(5000); // 5 second timeout
            }
            catch (Exception e)
            {
                Act.m.md.AMsg(e.Message); // Display error msg
                Act.m.console.PipeButton.IsChecked = false;
            }
        }).Start();
        return true;
    }

    public bool RaisePipe()
    {
        TextBlock tb = Act.m.tb;
        try
        {
            pipe = new NamedPipeServerStream("PIPE1", PipeDirection.InOut, numThreads);
            threadId = Thread.CurrentThread.ManagedThreadId;
            pipe.WaitForConnection();
            Act.m.md.Msg("Pipe Raised");
            return true;
        }
        catch (Exception e)
        {
            string err = e.Message;
            tb.Inlines.Add(new Run("Pipe Failed to Init on Server Side"));
            tb.Inlines.Add(new LineBreak());
            return false;
        }
    }

    public void ServerWaitForMessages()
    {
        new Thread(() =>
        {
            cmd = "";
            ss = new StreamString(pipe);
            while (cmd != "CLOSE")
            {
                try
                {
                    buffer = new char[256];
                    text = "";
                    msg = ss.ReadString().ToUpper();
                    word = msg.Split(' ');
                    cmd = word[0].ToUpper();
                    for (i = 1; i < word.Length; i++) text += word[i] + " ";
                    switch (cmd)
                    {
                        case "AUTHENTICATE": ss.WriteString("I am PIPE1 server"); break;
                        case "SOMEPIPEREQUEST":ss.WriteString(doSomePipeRequestReturningString()):break;
                        case "CLOSE": ss.WriteString("CLOSE");// reply to client
                            Thread.Sleep(1000);// wait for client to pick-up shutdown message
                            pipe.Close();
                            Act.m.md.Msg("Server Shutdown ok"); // Server side message
                            break;
                    }
                }
                catch (IOException iox)
                {
                    string error = iox.Message;
                    Act.m.md.Msg(error);
                    break;
                }
            }
        }).Start();
    }

    public void DummyClientCloseServerRequest()
    {
        StreamString ss = new StreamString(dummyclient);
        ss.WriteString("CLOSE");
        ss.ReadString();
    }

//用法,将ToggleButtons放置在StackPanel中,并在代码中进行支持:

private void PipeButton_Checked(object sender, RoutedEventArgs e)
    {
        Act.m.pwin.ConnectDummyClient();
        Act.m.pwin.RaisePipe();
    }
private void PipeButton_Unchecked(object sender, RoutedEventArgs e)
    {
        Act.m.pwin.DummyClientCloseServerRequest();
        Act.m.console.WaitButton.IsChecked = false;
        Keyboard.Focus(Act.m.md.tb1);
    }
private void WaitButton_Checked(object sender, RoutedEventArgs e)
    {
        Act.m.pwin.Wait();
    }
private void WaitButton_Unchecked(object sender, RoutedEventArgs e)
    {
    }

//对我来说非常有效。敬礼,zzzbc }


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接