使用atexit终止线程时,脚本在退出时卡住。

4

我正在使用Python 3.7.4的线程功能,并希望使用atexit来注册一个清理函数,以(干净地)终止这些线程。

例如:

# example.py
import threading
import queue
import atexit
import sys

Terminate = object()

class Worker(threading.Thread):
    def __init__(self):
        super().__init__()
        self.queue = queue.Queue()

    def send_message(self, m):
        self.queue.put_nowait(m)

    def run(self):
        while True:
            m = self.queue.get()
            if m is Terminate:
                break
            else:
                print("Received message: ", m)


def shutdown_threads(threads):
    for t in threads:
        print(f"Terminating thread {t}")
        t.send_message(Terminate)
    for t in threads:
        print(f"Joining on thread {t}")
        t.join()
    else:
        print("All threads terminated")

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    atexit.register(shutdown_threads, threads)

    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    sys.exit(0)

然而,似乎在atexit回调中与线程和队列交互会导致某些内部关闭例程死锁:

$ python example.py
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
^CException ignored in: <module 'threading' from '/usr/lib64/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/lib64/python3.7/threading.py", line 1308, in _shutdown
    lock.acquire()
KeyboardInterrupt
Terminating thread <Worker(Thread-1, started 140612492904192)>
Terminating thread <Worker(Thread-2, started 140612484511488)>
Terminating thread <Worker(Thread-3, started 140612476118784)>
Terminating thread <Worker(Thread-4, started 140612263212800)>
Terminating thread <Worker(Thread-5, started 140612254820096)>
Joining on thread <Worker(Thread-1, stopped 140612492904192)>
Joining on thread <Worker(Thread-2, stopped 140612484511488)>
Joining on thread <Worker(Thread-3, stopped 140612476118784)>
Joining on thread <Worker(Thread-4, stopped 140612263212800)>
Joining on thread <Worker(Thread-5, stopped 140612254820096)>
All threads terminated

KeyboardInterrupt 是我使用 ctrl-c 的方式,因为该进程似乎无限期挂起)。

然而,在退出之前发送 Terminate 消息(取消注释 t.send_message("Hello") 后的那行),程序不会挂起,而是正常终止:

$ python example.py
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Terminating thread <Worker(Thread-1, stopped 140516051592960)>
Terminating thread <Worker(Thread-2, stopped 140516043200256)>
Terminating thread <Worker(Thread-3, stopped 140515961992960)>
Terminating thread <Worker(Thread-4, stopped 140515953600256)>
Terminating thread <Worker(Thread-5, stopped 140515945207552)>
Joining on thread <Worker(Thread-1, stopped 140516051592960)>
Joining on thread <Worker(Thread-2, stopped 140516043200256)>
Joining on thread <Worker(Thread-3, stopped 140515961992960)>
Joining on thread <Worker(Thread-4, stopped 140515953600256)>
Joining on thread <Worker(Thread-5, stopped 140515945207552)>
All threads terminated

这引出了一个问题,threading._shutdown例程相对于atexit处理程序何时执行? 在atexit处理程序中与线程交互是否有意义?

为什么你不想执行 #t.send_message(Terminate) 呢? - stovfl
1
显然,解释器在所有非守护线程退出之前不会调用atexit处理程序,这听起来很可疑,就像在Python 2.6.5中修复的错误一样(请参见-https://dev59.com/qFDTa4cB1Zd3GeqPJXmE和https://bugs.python.org/issue1722344)。解决方法可能是将主代码包装在`try` / finally中,并手动调用shutdown_threads(threads) - martineau
2个回答

9

您可以使用一个守护线程来要求您的非守护线程优雅地进行清理。举个需要使用此方法的例子,如果您正在使用一个启动非守护线程的第三方库,那么您必须更改该库或执行以下操作:

import threading

def monitor_thread():
    main_thread = threading.main_thread()
    main_thread.join()
    send_signal_to_non_daemon_thread_to_gracefully_shutdown()


monitor = threading.Thread(target=monitor_thread)
monitor.daemon = True
monitor.start()

start_non_daemon_thread()

将此放入原始帖子的代码上下文中(请注意,我们不需要 atexit 函数,因为它只会在所有非守护线程停止后才被调用):

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    
    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    def monitor_thread():
        main_thread = threading.main_thread()
        main_thread.join()
        shutdown_threads(threads)

    monitor = threading.Thread(target=monitor_thread)
    monitor.daemon = True
    monitor.start()

3
这对我来说按预期工作,但我注意到如果monitor_thread不是守护线程,它也会起作用。我的解释是它已经在main_thread.join()上等待,因此当main_thread退出时就会醒来。文档中说:“守护线程在关闭时会突然停止。”这让我想到,这里我们实际上可能不希望monitor成为守护线程。 - lekv
这里有一个超级大的点赞和感谢。我整天都在为这个确切的问题苦苦挣扎,我想在Django 4.2启动时启动一个快速的线程(加载一些数据)。由于某种神秘的原因,我在本地开发模式下遇到了完全相同的问题,使用自动重新加载时(在重新加载时使用sys.exit(3)),跳过使用atexit和信号处理程序是唯一有效的解决方案。 - undefined

1

atexit.register(func)func 函数注册为在程序终止时执行的函数。

在主线程执行最后一行代码(例如上面的例子中的 sys.exit(0))之后,解释器会调用 threading._shutdown 等待所有非守护线程(在上面的示例中创建的 Workers)退出。

当没有任何存活的非守护线程时,整个 Python 程序将退出。

所以在按下 CTRL+C 后,主线程被 SIGINT 信号终止,然后解释器调用了 atexit 注册的函数。

顺便说一下,如果你将 daemon=True 传递给 Thread.__init__,程序将直接运行而不需要任何人机交互。


1
是的,但我希望线程能够优雅地被终止,给它们执行清理代码的机会。这就是我预期的 atexit 处理程序的作用。 - Charles Langlois
如上所述,您可以通过将Worker设置为守护线程(super().__init__(daemon=True))来实现。 - Jacky1205
据我所知,守护线程没有机会优雅地处理它们的终止,例如清理它们可能持有的任何资源。当主线程退出时,它们被无情地杀死,并且在运行时关闭过程中根本不考虑它们。请参阅此处以了解使用守护线程可能存在的问题,例如:https://www.joeshaw.org/python-daemon-threads-considered-harmful/ - Charles Langlois
1
atexit是一个例外,允许我们在Python解释器进行真正的最终化之前执行一些清理操作。此时解释器仍然完全完好无损(https://github.com/python/cpython/blob/master/Python/pylifecycle.c#L1276)。 - Jacky1205
作为证明,您可以在工人从队列中收到“终止”后添加日志记录。 您会看到,在执行“atexit”注册函数期间,工人仍然活着。 - Jacky1205
1
好的,我明白了,那很有道理。要么我使用守护线程和atexit,要么在退出之前必须手动进行关闭。谢谢! - Charles Langlois

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接