是否可能在不设置/检查任何标志/信号量等的情况下终止正在运行的线程?
是否可能在不设置/检查任何标志/信号量等的情况下终止正在运行的线程?
在Python中,以及任何其他语言中,突然终止线程通常是一个不好的模式。考虑以下情况:
如果您可以承担得起(如果您正在管理自己的线程),处理这种情况的好方法是拥有一个退出请求标志,每个线程定期检查该标志,以确定是否是退出的时候。
例如:
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
stop()
,并使用join()
等待线程正确退出。线程应该定期检查停止标志。def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):
'''A thread class that supports raising an exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id
CAREFUL: this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
# TODO: in python 2.6, there's a simpler way to do: self.ident
raise AssertionError("could not determine the thread's id")
def raise_exc(self, exctype):
"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,
one way to ensure that it works is:
t = ThreadWithExc( ... )
...
t.raise_exc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raise_exc( SomeException )
If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.
CAREFUL: this function is executed in the context of the
caller thread, to raise an exception in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )
PyThreadState_SetAsyncExc
返回值的引用似乎来自于一个旧版本的Python。)SO_REUSEADDR
套接字选项来避免出现“地址已在使用”的错误。 - Messares != 1
的情况下,我必须传递 None
而不是 0
,并且我必须调用 ctypes.c_long(tid)
并将其直接传递给任何 ctypes 函数,而不是直接传递 tid。 - Walt Wmultiprocessing.Process
可以使用p.terminate()
方法终止进程。
如果我想要杀死一个线程,但是不想使用标志、锁、信号量、事件或其他方式,我会将该线程升级为完整的进程。对于只使用少量线程的代码,开销并不大。
例如,这对于易于终止执行阻塞I/O的帮助程序“线程”非常有用。
转换很简单:在相关代码中,将所有threading.Thread
替换为multiprocessing.Process
,将所有queue.Queue
替换为multiprocessing.Queue
并在父进程中添加需要调用p.terminate()
来杀死其子进程p
的必需语句。
请参阅Python文档。
示例:
import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate() # sends a SIGTERM
multiprocessing
很好用,但需要注意的是参数会被序列化到新进程中。所以如果其中一个参数是无法序列化的(比如 logging.log
),使用 multiprocessing
可能不是个好主意。 - Lyagermultiprocessing
参数被序列化并传递给新的进程,但是在Linux系统中会使用forking方法复制它们(Python 3.7版本如此,其他版本未知)。因此,你可能会得到在Linux上正常运行但在Windows上引发pickle错误的代码。 - nyanpasu64没有官方API可以做到这一点。
您需要使用平台API来终止线程,例如pthread_kill或TerminateThread。您可以通过pythonwin或ctypes访问此类API。
请注意,这本质上是不安全的。它可能会导致无法收集的垃圾(来自成为垃圾的堆栈帧的局部变量),并且如果在杀死线程时该线程具有GIL,则可能会导致死锁。
Thread.daemon
。就像其他人提到的那样,通常会设置一个停止标志。对于一些轻量级的情况(没有线程的子类化,没有全局变量),可以使用 lambda 回调函数进行处理。(请注意,在 if stop()
中要加上括号。)
import threading
import time
def do_work(id, stop):
print("I am thread", id)
while True:
print("I am thread {} doing something".format(id))
if stop():
print(" Exiting loop.")
break
print("Thread {}, signing off".format(id))
def main():
stop_threads = False
workers = []
for id in range(0,3):
tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
workers.append(tmp)
tmp.start()
time.sleep(3)
print('main: done sleeping; time to stop the threads.')
stop_threads = True
for worker in workers:
worker.join()
print('Finis.')
if __name__ == '__main__':
main()
将print()
替换为一个总是刷新(sys.stdout.flush()
)的pr()
函数可能会提高Shell输出的精度。
(仅在Windows/Eclipse/Python3.3上测试过)
pr()
函数是什么? - alperyour_process.terminate() # kill the process!
your_thread.daemon = True # set the Thread as a "daemon thread"
start()
方法之前,将线程设置为daemon
是必要的!multiprocessing
中使用daemon
。在这里,当主进程退出时,它会尝试终止所有守护子进程。sys.exit()
和os.kill()
不是选择。这是基于thread2 -- 可停止的线程 ActiveState食谱。
你需要调用PyThreadState_SetAsyncExc()
,该函数只能通过ctypes
模块使用。
这仅在Python 2.7.3上进行了测试,但很可能适用于其他不久前的2.x版本。 PyThreadState_SetAsyncExc()
在Python 3中仍然存在以保持向后兼容性(但我没有测试过)。
import ctypes
def terminate_thread(thread):
"""Terminates a python thread from another thread.
:param thread: a threading.Thread instance
"""
if not thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
KeyboardInterrupt
信号,以便它们有机会进行清理。如果在此之后它们仍然挂起,那么 SystemExit
是适当的,或者可以从终端杀死该进程。 - drevickopthread_cleanup_push()/_pop()
来使线程终止在CPython中变得安全,但要正确实现它需要大量的工作,并且会明显减慢解释器的速度。 - Matthias Urlichs强制杀死线程时,应该与其合作而不是单方面操作。
强制终止线程会破坏try/finally代码块设置的所有保证,可能会导致锁定锁住、文件未关闭等问题。
唯一可以争辩认为强制杀死线程是个好主意的时候是为了快速结束程序,但永远不要单独终止某个线程。
time.sleep()
(比如轮询一些外部服务),那么对于Phillipe的方法的改进是在你的sleep()
语句处使用event
的wait()
方法的超时时间。例如:import threading
class KillableThread(threading.Thread):
def __init__(self, sleep_interval=1):
super().__init__()
self._kill = threading.Event()
self._interval = sleep_interval
def run(self):
while True:
print("Do Something")
# If no kill signal is set, sleep for the interval,
# If kill signal comes in while sleeping, immediately
# wake up and handle
is_killed = self._kill.wait(self._interval)
if is_killed:
break
print("Killing Thread")
def kill(self):
self._kill.set()
t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread
wait()
而不是sleep()
并定期检查事件的优点在于,您可以编程更长的睡眠间隔,线程几乎立即停止(当您将否则处于sleep()
状态时),并且在我看来,处理退出的代码显着更简单。time.sleep
并将轮询间隔缩小,以便我的脚本能够更快地响应。然而,这个解决方案具有使轮询间隔变小的所有好处,却没有浪费计算资源的缺点。+1 非常感谢。 - A Kareem您可以通过在将退出线程的跟踪中安装跟踪来终止线程。 可以查看附带链接以获取一种可能的实现方式。