Python: 捕捉Ctrl-C命令。提示“是否真的要退出(y / n)”,如果不退出则恢复执行。

44

我有一个可能需要很长时间才能执行完的程序。在主模块中,我有以下代码:

import signal
def run_program()
   ...time consuming execution...

def Exit_gracefully(signal, frame):
    ... log exiting information ...
    ... close any open files ...
    sys.exit(0)

if __name__ == '__main__':
    signal.signal(signal.SIGINT, Exit_gracefully)
    run_program()
这个可以正常运行,但我想在捕获SIGINT时暂停执行,提示用户是否真的想退出,并在run_program() 中恢复到之前离开的地方,如果他们决定不想退出的话。 我能想到的唯一方法是在单独的线程中运行程序,让主线程等待并准备捕获SIGINT。 如果用户想要退出,主线程可以进行清理并终止子线程。 有更简单的方法吗?

2
退出代码周围的 if raw_input("确定要退出吗?y/n").lower().startswith('y'): 有什么问题吗? - user2357112
4个回答

76

Python的信号处理程序似乎并不是真正的信号处理程序;也就是说,在C处理程序已经返回并在正常流程中发生后才会发生。因此,您应该尝试将退出逻辑放在信号处理程序中。由于信号处理程序在主线程中运行,它也会阻塞执行。

像这样的代码似乎可以很好地工作。

import signal
import time
import sys

def run_program():
    while True:
        time.sleep(1)
        print("a")

def exit_gracefully(signum, frame):
    # restore the original signal handler as otherwise evil things will happen
    # in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant
    signal.signal(signal.SIGINT, original_sigint)

    try:
        if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'):
            sys.exit(1)

    except KeyboardInterrupt:
        print("Ok ok, quitting")
        sys.exit(1)

    # restore the exit gracefully handler here    
    signal.signal(signal.SIGINT, exit_gracefully)

if __name__ == '__main__':
    # store the original SIGINT handler
    original_sigint = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, exit_gracefully)
    run_program()

该代码在raw_input期间恢复了原始信号处理程序; raw_input本身不可重入,重新进入会导致RuntimeError:can't re-enter readlinetime.sleep中抛出,这是我们不想要的,因为它比KeyboardInterrupt更难捕获。 相反,我们让连续2个Ctrl-C引发KeyboardInterrupt


2
哇!这真的很酷。然而,我认为从提示符中使用crtl-c的能力应该放在装饰器中,因为它使代码更加清晰易懂。是否可以将其作为答案提供(因为我无法编辑您的答案以添加此替代方式)? - mr2ert
装饰器有一些神奇的作用...我会在编写这个模块时保留我的装饰器版本(因为它非常好 :) )。不过我想指出,您可以在不更改信号处理程序的情况下完成此操作,只需在提示中输入 y 才能退出。 - mr2ert
很棒,这个可行。Antti,我在运行你的代码时遇到了一个问题,退出信号处理程序会导致IOERROR:中断函数调用。只有当我将睡眠时间设置为0.001秒时才能正常工作,如果设置为0.01或更高,则会出现错误。据我所知,这只是在Windows上出现的问题,在Cygwin中代码可以正常运行。 - Colin M
2
如果我们有多线程应用程序,则此策略无效。 - Ciasto piekarz
1
@san 此外,信号应该在大多数情况下能够在多线程应用程序中正常工作,其中有一个主线程处理所有输入,但是信号处理程序必须在该主线程中设置... - Antti Haapala -- Слава Україні
显示剩余12条评论

6

好的,抱歉...现在我没有时间,尽快的时候,我会听从你的建议,谢谢。 - Marc
在Python 3.7.3版本中,对我来说不会打断time.sleep()调用。 - Ben Slade

0

当过程结束时,执行某些操作

假设您只想在任务结束后过程将执行某些操作

import time

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    @staticmethod
    def do_something():
        try:
            time.sleep(5)
        except:
            pass

with TestTask('Hello World') as task:
    task.do_something()

当进程离开with时,即使发生KeyboardInterrupt,也会运行__exit__

如果您不想看到错误,请添加try ... except ...

@staticmethod
def do_something():
    try:
        time.sleep(5)
    except:
        pass

暂停,继续,重置等

我没有完美的解决方案,但这可能对您有用。

这意味着将您的进程分成许多子进程并保存已完成的进程。因为您已经发现它已经完成,所以不会再次执行。

import time
from enum import Enum

class Action(Enum):
    EXIT = 0
    CONTINUE = 1
    RESET = 2

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    def do_something(self):
        tuple_job = (self._foo, self._bar)  # implement by yourself
        list_job_state = [0] * len(tuple_job)
        dict_keep = {}  # If there is a need to communicate between jobs, and you don’t want to use class members, you can use this method.
        while 1:
            try:
                for idx, cur_process in enumerate(tuple_job):
                    if not list_job_state[idx]:
                        cur_process(dict_keep)
                        list_job_state[idx] = True
                if all(list_job_state):
                    print('100%')
                    break
            except KeyboardInterrupt:
                print('KeyboardInterrupt. input action:')
                msg = '\n\t'.join([f"{action + ':':<10}{str(act_number)}" for act_number, action in
                                   enumerate([name for name in vars(Action) if not name.startswith('_')])
                                   ])
                case = Action(int(input(f'\t{msg}\n:')))
                if case == Action.EXIT:
                    break
                if case == Action.RESET:
                    list_job_state = [0] * len(tuple_job)

    @staticmethod
    def _foo(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('1%')
        print('2%')
        print('...')
        print('60%')
        keep_dict['status_1'] = 'status_1'
        return True

    @staticmethod
    def _bar(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('61%')
        print(keep_dict.get('status_1'))
        print('...')
        print('99%')
        return True

with TestTask('Hello World') as task:
    task.do_something()

控制台

input action number:2
Task Start!:Hello World
1%
2%
...
60%
KeyboardInterrupt. input action:
        EXIT:     0
        CONTINUE: 1
        RESET:    2
:1
61%
status_1
...
99%
100%
Task End!


0
代码
import signal
import time

flag_exit = False


def signal_handler(signal, frame):
    if input("  Ctrl+C detected. Do you really want to exit? y/n > ").lower().startswith('y'):
        global flag_exit
        flag_exit = True
        print("Wait for graceful exit...")


signal.signal(signal.SIGINT, signal_handler)


def get_time():
    from datetime import datetime
    now = datetime.now()
    dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
    return dt_string


def process():
    for i in range(999):
        if flag_exit:
            break
        print(f"[{get_time()}] start process: {i}")
        time.sleep(5)
        print(f"[{get_time()}] end process: {i}")
        print()


if __name__ == "__main__":
    process()

输出

[2023-07-11 10:42:21] start process: 0
[2023-07-11 10:42:26] end process: 0

[2023-07-11 10:42:26] start process: 1
^C  Ctrl+C detected. Do you really want to exit? y/n > n
[2023-07-11 10:42:31] end process: 1

[2023-07-11 10:42:31] start process: 2
^C  Ctrl+C detected. Do you really want to exit? y/n > y
Wait for graceful exit...
[2023-07-11 10:42:36] end process: 2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接