如何在Python中停止一个循环线程?

99

如何告诉一个循环线程停止循环是合适的方式?

我有一个相当简单的程序,在一个独立的threading.Thread类中对指定的主机进行ping测试。在这个类中,它休眠60秒,再次运行直到应用程序退出。

我想在我的wx.Frame中实现一个“停止”按钮,以请求循环线程停止。它不需要立即结束线程,只需在下一次唤醒时停止循环即可。

这是我的threading类(请注意:我尚未实现循环,但它可能落在PingAssets的run方法下)

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset

    def run(self):
        config = controller.getConfig()
        fmt = config['timefmt']
        start_time = datetime.now().strftime(fmt)
        try:
            if onlinecheck.check_status(self.asset):
                status = "online"
            else:
                status = "offline"
        except socket.gaierror:
            status = "an invalid asset tag."
        msg =("{}: {} is {}.   \n".format(start_time, self.asset, status))
        wx.CallAfter(self.window.Logger, msg)

在我的 wxPython 框架中,我有一个从“开始”按钮调用的函数:

并且在我的wxPyhton框架中,我有一个从“开始”按钮调用的函数:

def CheckAsset(self, asset):
        self.count += 1
        thread = PingAssets(self.count, asset, self)
        self.threads.append(thread)
        thread.start()
8个回答

161

可停止的线程函数

我们可以修改函数来允许通过标志位来停止线程,而不必继承threading.Thread

我们需要一个对象,该对象对正在运行的函数可见,以便我们将标志位设置为停止运行。

我们可以使用threading.currentThread()对象。

import threading
import time


def doit(arg):
    t = threading.currentThread()
    while getattr(t, "do_run", True):
        print ("working on %s" % arg)
        time.sleep(1)
    print("Stopping as you wish.")


def main():
    t = threading.Thread(target=doit, args=("task",))
    t.start()
    time.sleep(5)
    t.do_run = False
    

if __name__ == "__main__":
    main()

诀窍在于,正在运行的线程可以附加其他属性。该解决方案建立在以下假设的基础上:

  • 线程具有默认值为True的属性“do_run”
  • 驱动父过程可以将已启动的线程的属性“do_run”分配给False

运行代码,我们得到以下输出:

$ python stopthread.py                                                        
working on task
working on task
working on task
working on task
working on task
Stopping as you wish.

使用Event来终止进程

另一种选择是将threading.Event作为函数参数。它默认是False,但外部进程可以将其“设置”(为True),函数可以使用wait(timeout)函数了解情况。

我们可以使用零超时时间进行wait,但我们也可以将其用作睡眠计时器(如下所示)。

def doit(stop_event, arg):
    while not stop_event.wait(1):
        print ("working on %s" % arg)
    print("Stopping as you wish.")


def main():
    pill2kill = threading.Event()
    t = threading.Thread(target=doit, args=(pill2kill, "task"))
    t.start()
    time.sleep(5)
    pill2kill.set()
    t.join()

编辑:我在Python 3.6中尝试过这个方法。stop_event.wait()会阻塞事件(以及while循环),直到释放。它不返回布尔值。使用stop_event.is_set()可以代替。

用一颗药丸停止多个线程

如果我们需要同时停止多个线程,使用药丸停止的优势就更明显了,因为一颗药丸可以同时作用于所有线程。

doit将不会发生任何改变,只有main处理线程的方式会略有不同。

def main():
    pill2kill = threading.Event()
    tasks = ["task ONE", "task TWO", "task THREE"]

    def thread_gen(pill2kill, tasks):
        for task in tasks:
            t = threading.Thread(target=doit, args=(pill2kill, task))
            yield t

    threads = list(thread_gen(pill2kill, tasks))
    for thread in threads:
        thread.start()
    time.sleep(5)
    pill2kill.set()
    for thread in threads:
        thread.join()

有一个问题,假设我们设置了一个内核信号,比如SIGALRM,在信号处理程序中,我们想使用您的方法(pill2kill.set然后join),停止进程和线程,然后sys.exit(0)。 1)如果您运行应用程序并等待n秒钟,它可以正常工作 2)如果您按下ctrl+c,它可以正常工作 3)但是,如果您按下ctrl+z,然后等待一些时间,然后“fg”恢复进程,如果SIGALRM的n秒已经过去,则进程将停止,但线程仍将继续工作几毫秒。 我有一段代码来证明它,你有什么想法吗? - KOrrosh Sh
我正在使用ThreadPoolExecutor,只有Futures而不是线程来设置一些属性(如“do_run”),因此我需要一个杀死线程的方法。 - Genarito
此外,如果您不想等待,可以使用 is_set 代替带有0超时的 wait - Genarito
threading.currentThreadthreading.current_thread 的一个已弃用的别名。详见 https://docs.python.org/3.10/library/threading.html#threading.current_thread。 - Filip Müller

29

14

我在Stack上阅读了其他问题,但仍然有点困惑如何跨类进行通信。这是我的方法:

我在wxFrame类的__init__方法中使用列表来保存所有的线程:self.threads = []

如何停止Python中的循环线程?所建议的,在我的线程类中使用一个信号,该信号在初始化线程类时设置为True

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset
        self.signal = True

    def run(self):
        while self.signal:
             do_stuff()
             sleep()

我可以通过迭代我的线程来停止这些线程:

def OnStop(self, e):
        for t in self.threads:
            t.signal = False

4
我有一个不同的方法。我已经创建了一个Thread类的子类,并在构造函数中创建了一个Event对象。然后,我编写了自定义的join()方法,该方法首先设置此事件,然后调用其父代版本。

这是我在wxPython应用程序中用于串口通信的类:

import wx, threading, serial, Events, Queue

class PumpThread(threading.Thread):

    def __init__ (self, port, queue, parent):
        super(PumpThread, self).__init__()
        self.port = port
        self.queue = queue
        self.parent = parent

        self.serial = serial.Serial()
        self.serial.port = self.port
        self.serial.timeout = 0.5
        self.serial.baudrate = 9600
        self.serial.parity = 'N'

        self.stopRequest = threading.Event()

    def run (self):
        try:
            self.serial.open()
        except Exception, ex:
            print ("[ERROR]\tUnable to open port {}".format(self.port))
            print ("[ERROR]\t{}\n\n{}".format(ex.message, ex.traceback))
            self.stopRequest.set()
        else:
            print ("[INFO]\tListening port {}".format(self.port))
            self.serial.write("FLOW?\r")

        while not self.stopRequest.isSet():
            msg = ''
            if not self.queue.empty():
                try:
                    command = self.queue.get()
                    self.serial.write(command)
                except Queue.Empty:
                    continue

            while self.serial.inWaiting():
                char = self.serial.read(1)
                if '\r' in char and len(msg) > 1:
                    char = ''
                    #~ print('[DATA]\t{}'.format(msg))
                    event = Events.PumpDataEvent(Events.SERIALRX, wx.ID_ANY, msg)
                    wx.PostEvent(self.parent, event)
                    msg = ''
                    break
                msg += char
        self.serial.close()

    def join (self, timeout=None):
        self.stopRequest.set()
        super(PumpThread, self).join(timeout)

    def SetPort (self, serial):
        self.serial = serial

    def Write (self, msg):
        if self.serial.is_open:
            self.queue.put(msg)
        else:
            print("[ERROR]\tPort {} is not open!".format(self.port))

    def Stop(self):
        if self.isAlive():
            self.join()

队列用于向端口发送消息,主循环接收响应。我没有使用serial.readline()方法,因为换行符不同,并且我发现使用io类会过于繁琐。

3

取决于您在线程中运行的内容。 如果那是您的代码,那么您可以实现一个停止条件(参见其他答案)。

但是,如果您想运行别人的代码,则应该分叉并启动一个进程。像这样:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()

现在,每当您想停止该进程时,可以像以下这样发送一个SIGTERM信号:

proc.terminate()
proc.join()

而且速度很快:只需几分之一秒。祝您使用愉快 :)


0

我发现派生自 threading.Thread 的类对封装线程功能很有用。您只需在此类的重写版本中提供自己的主循环即可。调用 start() 可以安排对象的 run() 方法在单独的线程中调用。

在主循环内,定期检查是否已设置 threading.Event。这样的事件是线程安全的。

在此类内部,您拥有自己的 join() 方法,它在调用基类的 join() 方法之前设置停止事件对象。它可以选择性地采用时间值传递给基类的 join() 方法,以确保您的线程在短时间内终止。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, sleep_time=0.1):
        self._stop_event = threading.Event()
        self._sleep_time = sleep_time
        """call base class constructor"""
        super().__init__()

    def run(self):
        """main control loop"""
        while not self._stop_event.isSet():
            #do work
            print("hi")
            self._stop_event.wait(self._sleep_time)

    def join(self, timeout=None):
        """set stop event and join within a given time period"""
        self._stop_event.set()
        super().join(timeout)


if __name__ == "__main__":
    t = MyThread()
    t.start()

    time.sleep(5)

    t.join(1) #wait 1s max

在检查 threading.Event 之前,在主循环中小睡一会比连续循环更少占用 CPU。您可以设置默认的睡眠时间(例如0.1秒),但也可以在构造函数中传递值。


0
有时您无法控制运行目标。在这些情况下,您可以使用 signal.pthread_kill 发送停止信号。
from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep

def target():
    for num in count():
        print(num)
        sleep(1)

thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)

结果

0
1
2
3
4

[14]+  Stopped

0

我的解决方案是:

import threading, time

def a():
    t = threading.currentThread()
    while getattr(t, "do_run", True):
    print('Do something')
    time.sleep(1)

def getThreadByName(name):
    threads = threading.enumerate() #Threads list
    for thread in threads:
        if thread.name == name:
            return thread

threading.Thread(target=a, name='228').start() #Init thread
t = getThreadByName('228') #Get thread by name
time.sleep(5)
t.do_run = False #Signal to stop thread
t.join()

1
在代码后面或下面添加#,并解释该行的作用。仅仅倾泻代码会让我们“不聪明”;-)提示:将您的答案制作成其他人可以粘贴到他们的解释器中的工作解决方案。添加额外的代码是允许的。欢迎来到SO,祝您愉快。评论结束。 - ZF007
没有必要将 Thread 对象转换为字符串,然后解析该字符串以获取线程的名称,因为您可以使用其 name 属性。命名是此代码中的另一个问题,例如,我不明白为什么 flex 是线程列表的合理名称。您可以在这里找到更多有关如何改进代码的方法。 - cl0ne

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接