线程池执行器中的工作线程并不是真正的守护线程。

32
我无法理解的是,虽然 ThreadPoolExecutor 使用了守护线程,但即使主线程退出,工作线程仍将继续运行。
我可以提供一个Python3.6.4的最小示例:
import concurrent.futures
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
    time.sleep(1)
    print("Wow")

主线程和工作线程都是无限循环。因此,如果我使用KeyboardInterrupt来终止主线程,我希望整个程序也会终止。但实际上,即使它是一个守护线程,工作线程仍然在运行。

ThreadPoolExecutor的源代码证实了工作线程是守护线程:

t = threading.Thread(target=_worker,
                     args=(weakref.ref(self, weakref_cb),
                           self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)

此外,如果我手动创建一个守护线程,它就像魔法一样工作:
from threading import Thread
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
    time.sleep(1)
    print("Wow")

所以我真的无法理解这种奇怪的行为。

因为我的测试套件挂起了(由于长时间运行的I/O,例如http服务器套接字),所以我正在恢复CI/CD。这促使我重新实现池,而不使用atexit处理程序:https://gist.github.com/BinarSkugga/edc52b9f3fad44f0a4de8739125e3d3f - BinarSkugga
2个回答

39
突然间...我找到了原因。根据更多的ThreadPoolExecutor源代码:
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False

def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()

atexit.register(_python_exit)

有一个退出处理程序,它将加入所有未完成的工作线程...


14
如果 daemon=True 没有实现主要目的,那么它的好处是什么? - samthegolden

4

以下是避免此问题的方法。另一种不良设计可以打败另一种不良设计。只有当工作线程不会损坏任何对象或文件时,人们才会编写daemon=True

在我的情况下,我用单个工作进程创建了TreadPoolExecutor,并且在单个submit之后,我只是从队列中删除了新创建的线程,这样解释器就不会等待该线程自行停止。请注意,工作线程是在submit之后创建的,而不是在初始化TreadPoolExecutor之后创建的。

import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor

...

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(lambda: self._exec_file(args))
del concurrent.futures.thread._threads_queues[list(executor._threads)[0]]

这段代码在Python 3.8下有效,但在3.9+版本中可能无法正常工作,因为它访问了私有变量。

您可以查看GitHub上的可运行代码示例


非常感谢您发布这个解决方法!现在我每次使用submit()后都会使用concurrent.futures.thread._threads_queues.clear() - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接