如何退出一个多线程程序?

7

我在使用Python进行线程处理时,写了一个基本的即时通讯程序 [代码见底部]

注意到当我用C-c杀掉程序时,它并没有退出,而是一直挂起。

我猜测这是因为它在等待每个线程完成它们正在做的事情,但由于这是一个无限循环,这永远不会发生。
所以我想手动杀死每个线程,或者在接收到kill信号时结束循环。
怎么做呢?

# code for basic IM thingy import threading import time def worker(): print "Worker thread started" while True: time.sleep(1) print "Worker thread is still running"
t = threading.Thread(target=worker) t.start()
while True: pass
#!/usr/bin/env python
import threading
import socket

class Listen(threading.Thread):

    def run(self):
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', 2727))
        conn.listen(1)
        while True:
            channel, details = conn.accept()
            print str(details)+": "+channel.recv(250)
            channel.send("got it")
            channel.close()

class Shout(threading.Thread):

    def run(self):
        while True:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, 2727))
                break
            except:
                print "can't connect to "+ str(address)
        while True:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect((address, 2727))
            conn.send(raw_input())
            conn.close()

listen = Listen().start()
shout = Shout().start()

你需要自己处理信号(SIGINT)并停止你的线程。我不是Python专家,所以其他人应该能够提供Python信号处理的具体示例。与此同时,你可能想要在谷歌上搜索一下(Python信号处理)。 - Brian Roach
在Debian上运行它没有问题,但在Windows上会出现异常。 - Steinthor.palsson
由于标题不具指示性和混淆,并且没有重用价值,因此投票关闭:真正的问题是特定代码中存在多个具体问题,而不是“如何退出多线程程序”。 - ivan_pozdeev
2个回答

7
我看到你的代码中存在一些问题,以下是原因:
  1. Ctrl+C会在主线程中引发"KeyboardInterrupt"异常。因此,你应该在那里处理它。
  2. 你的套接字处于阻塞模式。这会导致几个套接字函数阻塞调用线程,直到函数返回。在此状态下,线程无法响应任何终止事件。
  3. 正如你所说:线程的run()函数中的无限循环是...真的是无限的。因此,线程执行永远不会结束(至少不会没有意外异常)。你应该使用某种同步对象,比如一个线程.Event对象,以便能够从外部告诉线程它应该终止自己。
  4. 我不建议在主线程之外使用raw_input()。想象一下当你有多个Shout线程时会发生什么。
  5. 为什么在Shout类中传输消息后总是关闭和重新连接套接字?网络连接应该仅在特殊情况下重新建立,因为设置成本很高。
  6. 如果没有通信的帧协议,你永远不能指望在recv()函数返回时已经收到了其他主机发送的所有数据。
  7. 线程对象的start()函数不返回值或对象。因此保存返回的值(=None)没有太多意义。
  8. 你永远不能指望send()函数传输所有传递的数据。因此,你必须检查函数的结果,并在没有真正传输所有字节时适当地处理情况。
  9. 学习线程编程时,肯定有比网络通信更好的问题需要解决,因为这个主题本身就非常复杂。
除了这些问题,这是我尝试的解决方案。仍然有很多可以改进的地方。你应该考虑Mark Tolonen的答案,因为SocketServer类肯定提供了简化处理这种事情的方法。但你也应该继续学习基础知识。
#!/usr/bin/env python
import threading
import socket
import time
import errno

class StoppableThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()        

    def stop(self):
        if self.isAlive() == True:
            # set event to signal thread to terminate
            self.stop_event.set()
            # block calling thread until thread really has terminated
            self.join()

class Accept(StoppableThread):
    def __init__(self, port):
        StoppableThread.__init__(self)
        self.port = port
        self.threads = []

    def run(self):     
        # handle connection acception
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', self.port ))
        conn.listen(5)
        # set socket timeout to ~10ms
        conn.settimeout(0.01)
        while self.stop_event.is_set() == False:
            try:
                csock, caddr = conn.accept()
                # spawn a new thread to handle the client connection
                listen_thread = Listen(csock, caddr)
                self.threads.append(listen_thread)
                listen_thread.start()
            except socket.timeout:
                # socket operation timeout
                # clear all terminated threads from thread list                
                for thread in self.threads:
                    if thread.isAlive() == False:
                        self.threads.remove(thread)

        self.stop_threads()

    def stop_threads(self):
        # stop all running threads
        for listen_thread in self.threads:
            if listen_thread.isAlive() == True:
                listen_thread.stop()
        self.threads = [] 

class Listen(StoppableThread):
    def __init__(self, csock, caddr):
        StoppableThread.__init__(self)
        self.csock = csock
        self.caddr = caddr
        self.csock.setblocking(False)

    def run(self):                
        while self.stop_event.is_set() == False:            
            try:                
                recv_data = self.csock.recv(250)
                if len(recv_data) > 0:       
                    print str(self.caddr)+": " + recv_data
                    self.csock.send("got it")                    
                else:
                    # connection was closed by foreign host
                    self.stop_event.set()
            except socket.error as (sock_errno, sock_errstr):
                if (sock_errno == errno.EWOULDBLOCK):
                    # socket would block - sleep sometime
                    time.sleep(0.1)                    
                else:
                    # unexpected / unhandled error - terminate thread
                    self.stop_event.set()
        channel.close()

class Shout(StoppableThread):
    def __init__(self, sport):
        StoppableThread.__init__(self)
        self.sport = sport

    def run(self):
        while self.stop_event.is_set() == False:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, self.sport))
                break
            except socket.error:
                # handle connection problems
                print "can't connect to "+ str(address)
            except: 
                # exit thread in case of an unexpected error
                self.stop_event.set()

        while self.stop_event.is_set() == False:
            try: 
                # chat loop: send messages to remote host            
                print "what to send? :",
                msg = raw_input()
                # beware: send() function may block indefinitly here and it might not send all bytes as expected !!
                conn.send(msg)
            except:
                # exit thread in case of an unexpected error
                self.stop_event.set()
        # close socket before thread terminates
        conn.close()

def main():
    do_exit = False
    server_port = 2727

    # start server socket thread
    accept = Accept(server_port)
    accept.start()

    # start transmitting client socket thread
    shout = Shout(server_port)
    shout.start()

    while do_exit == False:
        try:
            # sleep some time
            time.sleep(0.1)
        except KeyboardInterrupt:
            # Ctrl+C was hit - exit program
            do_exit = True

    # stop all running threads
    shout.stop()
    accept.stop()

    # exit main program after all threads were terminated gracefully    

if __name__ == "__main__":
    main()

5

请查看Python库SocketServer.py的源代码,特别是server_forever()的实现,以了解服务器如何实现退出。它使用select()轮询服务器套接字以获取新连接并测试退出标志。这是对您的源代码进行的一个hack,使用了SocketServer,并向Shout()添加了退出标志。它将运行Shout和Listen线程5秒钟,然后停止它们。

import socket
import SocketServer
import threading
import time

class Handler(SocketServer.StreamRequestHandler):
    def handle(self):
        print str(self.client_address) + ": " + self.request.recv(250)
        self.request.send("got it\n")

class Listen(threading.Thread):
    def run(self):
        self.server = SocketServer.TCPServer(('',2727),Handler)
        self.server.serve_forever()
    def stop(self):
        self.server.shutdown()

class Shout(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.quit = False
    def run(self):
        while not self.quit:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect(('localhost', 2727))
            conn.send('sending\n')
            print conn.recv(100)
            conn.close()
    def stop(self):
        self.quit = True

listen = Listen()
listen.start()
shout = Shout()
shout.start()

time.sleep(5)

shout.stop()
listen.stop()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接