线程池执行器 KeyboardInterrupt

5
我有以下代码,使用concurrent.futures.ThreadPoolExecutor以计量的方式启动另一个程序的进程(最多30个)。 我还想要能够在按下ctrl-C停止python进程时停止所有工作。此代码有效,但有一个注意点:我必须按两次ctrl-C。第一次发送SIGINT时,没有任何反应;第二次,我看到“向进程发送SIGKILL”,进程死亡,并且它起作用了。我的第一个SIGINT发生了什么?
execution_list = [['prog', 'arg1'], ['prog', 'arg2']] ... etc
processes = []

def launch_instance(args):
    process = subprocess.Popen(args)
    processes.append(process)
    process.wait()

try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
        results = list(executor.map(launch_instance, execution_list))
except KeyboardInterrupt:
    print('sending SIGKILL to processes')
    for p in processes:
        if p.poll() is None: #If process is still alive
            p.send_signal(signal.SIGKILL)

可能会修复此问题: https://dev59.com/-XUOtIcB2Jgan1zn3f1x#72942970 - Erik Aronesty
1个回答

1

我在尝试解决类似问题时偶然发现了您的问题。我不能100%确定它是否能解决您的用例(我没有使用子进程),但我认为它可能会有所帮助。

只要作业仍在运行,您的代码将保留在executor的上下文管理器中。我的猜测是,第一个KeyboardInterrupt将被ThreadPoolExecutor捕获,其默认行为是不启动任何新作业,等待当前作业完成,然后进行清理(可能会重新引发KeyboardInterrupt)。但是,这些进程可能是长时间运行的,因此您可能不会注意到。第二个KeyboardInterrupt随后中断此错误处理。

以下是我如何解决我的问题(在单独的线程中运行无限后台进程)的代码:

from concurrent.futures import ThreadPoolExecutor
import signal
import threading
from time import sleep


def loop_worker(exiting):
    while not exiting.is_set():
        try:
            print("started work")
            sleep(10)
            print("finished work")
        except KeyboardInterrupt:
            print("caught keyboardinterrupt")  # never caught here. just for demonstration purposes


def loop_in_worker():
    exiting = threading.Event()
    def signal_handler(signum, frame):
        print("Setting exiting event")
        exiting.set()

    signal.signal(signal.SIGTERM, signal_handler)
    with ThreadPoolExecutor(max_workers=1) as executor:
        executor.submit(loop_worker, exiting)

        try:
            while not exiting.is_set():
                sleep(1)
                print('waiting')
        except KeyboardInterrupt:
            print('Caught keyboardinterrupt')
            exiting.set()
    print("Main thread finished (and thus all others)")


if __name__ == '__main__':
    loop_in_worker()

它使用一个Event来向线程发出停止当前操作的信号。在主循环中,有一个循环用于保持忙碌并检查任何异常。请注意,此循环位于ThreadPoolExecutor的上下文中。

作为奖励,它还通过使用相同的exiting事件来处理SIGTERM信号。

如果在processes.append(process)process.wait()之间添加一个循环以检查信号,则可能也会解决您的用例。这取决于您想对正在运行的进程执行什么操作。

如果您从命令行运行我的脚本并按ctrl-C键,则应该会看到类似以下内容:

started work
waiting
waiting
^CCaught keyboardinterrupt

   # some time passes here

finished work
Main thread finished (and thus all others)

我的解决方案的灵感来自于这篇博客文章

我有一个使用案例,在其中在threadpoolexecutor中调用API,即使按两次Ctrl+C..获取已经进行的API请求的结果需要时间,然后它停止。是否有任何方法可以立即退出而不等待已经完成的请求?此外,这是否是应该提升上游的问题? - Simplecode
嗨,Simplecode。我认为这里描述的东西不是问题,而是按设计来的。因此,在上游提出问题可能并不是需要的。关于你的问题:我不确定,但我认为如果你发布一个单独的问题,并提供有关你的设置和你尝试过的内容的更多细节,有人就能回答它。你尝试的代码越完整,有经验的人就越容易看出发生了什么,并且可以对其进行处理。最好的祝福,FlorianK。 - FlorianK
在Win10和Python3.7中无法运行。 - James
嗨@詹姆斯,我无法测试它(这里没有Windows机器),但也许你可以尝试使用signal.SIGINT或signal.CTRL_BREAK_EVENT而不是signal.SIGTERM。 - FlorianK

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接