多线程服务器中的Monitor.Wait/Pulse竞态条件问题

8

我在一个多线程TCP服务器中使用互锁的Monitor.Wait和Monitor.Pulse时遇到了问题。为了说明我的问题,这是我的服务器代码:

public class Server
{
    TcpListener listener;
    Object sync;
    IHandler handler;
    bool running;

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        sync = new Object();
        running = false;
    }

    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }

    public void Stop()
    {
        lock (sync)
        {
            listener.Stop();
            running = false;
            Monitor.Pulse(sync);
        }
    }

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }

    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Monitor.Pulse(sync);
        } 

        if (running)
        {
            TcpListener listener = (TcpListener)result.AsyncState;
            using (TcpClient client = listener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
}

以下是我的客户端代码:

class Client
{
    class EchoHandler : IHandler
    {
        public void Handle(Stream stream)
        {
            System.Console.Out.Write("Echo Handler: ");
            StringBuilder sb = new StringBuilder();
            byte[] buffer = new byte[1024];
            int count = 0;
            while ((count = stream.Read(buffer, 0, 1024)) > 0)
            {
                sb.Append(Encoding.ASCII.GetString(buffer, 0, count));
            }
            System.Console.Out.WriteLine(sb.ToString());
            System.Console.Out.Flush();
        }
    }

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];

    public static int Main()
    {
        Server server1 = new Server(new EchoHandler(), 1000);
        Server server2 = new Server(new EchoHandler(), 1001);

        server1.Start();
        server2.Start();

        Console.WriteLine("Press return to test...");
        Console.ReadLine();

        // Note interleaved ports
        SendMsg("Test1", 1000);
        SendMsg("Test2", 1001);
        SendMsg("Test3", 1000);
        SendMsg("Test4", 1001);
        SendMsg("Test5", 1000);
        SendMsg("Test6", 1001);
        SendMsg("Test7", 1000);

        Console.WriteLine("Press return to terminate...");
        Console.ReadLine();

        server1.Stop();
        server2.Stop();

        return 0;
    }

    public static void SendMsg(String msg, int port)
    {
        IPEndPoint endPoint = new IPEndPoint(localhost, port);

        byte[] buffer = Encoding.ASCII.GetBytes(msg);
        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
        {
            s.Connect(endPoint);
            s.Send(buffer);
        }
    }
}

客户端发送了七条消息,但服务器只打印了四条:
按回车键进行测试...
按回车键终止... Echo Handler: Test1 Echo Handler: Test3 Echo Handler: Test2 Echo Handler: Test4
我怀疑监视器在允许服务器的 Accept 方法中发生 Pulse 之前就已经发生了 Wait(在 ThreadStart 方法中),即使 ThreadStart 方法仍然持有对 sync 对象的锁,直到它调用 Monitor.Wait(),然后 Accept 方法可以获取该锁并发送其 Pulse。如果您在服务器的 Stop() 方法中注释掉这两行代码,则可以解决此问题:
//listener.Stop();
//running = false;

当服务器的Stop()方法被调用时,剩余的消息将出现(即唤醒服务器的sync对象会使其分派剩余的传入消息)。在我看来,这只能发生在ThreadStartAccept方法之间的竞态条件中,但是围绕sync对象的锁应该可以防止这种情况发生。
有什么想法吗?
非常感谢, 西蒙。
附注:请注意,我意识到输出显示顺序等问题,我特别询问锁和监视器之间的竞争条件。谢谢,SH。
1个回答

5
问题在于您正在使用Pulse/Wait作为信号。一个适当的信号,如AutoResetEvent,具有状态,使其保持信号状态,直到线程调用WaitOne()。如果没有任何线程在等待,调用Pulse将成为无操作。
这与锁可以被同一线程多次获取的事实相结合。由于您正在使用异步编程,因此接受回调可以由执行BeginAcceptTcpClient的同一线程调用。
让我举个例子。我注释掉了第二个服务器,并更改了您的服务器上的一些代码。
void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        lock (sync)
        {
            while (running)
            {
                try
                {
                    Console.WriteLine("BeginAccept [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                    Console.WriteLine("Wait [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    Monitor.Wait(sync);  // Release lock and wait for a pulse
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    lock (sync)
    {
        Console.WriteLine("Pulse [{0}]", 
            Thread.CurrentThread.ManagedThreadId);
        Monitor.Pulse(sync);
    }
    if (running)
    {
        TcpListener localListener = (TcpListener)result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}

下面是我的运行输出。如果您自己运行此代码,值将不同,但总体上是相同的。

Press return to test...
BeginAccept [3]
Wait [3]

Press return to terminate...
Pulse [5]
BeginAccept [3]
Pulse [3]
Echo Handler: Test1
Echo Handler: Test3
Wait [3]

如您所见,有两个脉冲被调用,一个来自另一个线程(Pulse [5]),它唤醒了第一个等待。然后线程3执行另一个BeginAccept,但由于有挂起的传入连接,该线程决定立即调用Accept回调。由于同一线程调用了Accept,因此Lock(sync)不会阻塞,而是在空线程队列上立即发出Pulse [3]。
两个处理程序被调用并处理了两条消息。
一切都很好,ThreadStart重新开始运行并无限期地等待。
现在,这里的根本问题是您正在尝试使用监视器作为信号。由于它不记住状态,第二个脉冲就丢失了。
但是有一个简单的解决方案。使用AutoResetEvents,这是一个适当的信号,它将记住其状态。
public Server(IHandler handler, int port)
{
    this.handler = handler;
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
    listener = new TcpListener(address, port);
    running = false;
    _event = new AutoResetEvent(false);
}

public void Start()
{
    Thread thread = new Thread(ThreadStart);
    thread.Start();
}

public void Stop()
{
    listener.Stop();
    running = false;
    _event.Set();
}

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        while (running)
        {
            try
            {
                listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                _event.WaitOne();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    _event.Set();
    if (running)
    {
        TcpListener localListener = (TcpListener) result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}

谢谢Mats。我假设BeginAcceptTcpClient总是在单独的线程上运行,因此我可以使用同步对象作为关键部分。你说得对,信号是正确的方法。再次感谢。SH - Simon Haines

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接