Python的多进程池中的键盘中断问题

166

如何使用Python的多进程池处理KeyboardInterrupt事件?这里有一个简单的示例:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

运行以上代码时,当我按下^C时,会引发KeyboardInterrupt,但进程在那一点上被挂起,我必须在外部强制终止它。

我希望能够随时按下^C并使所有进程优雅地退出。


我使用psutil解决了我的问题,你可以在这里看到解决方案:https://dev59.com/II7ea4cB1Zd3GeqPAF17#45259908 - Tiago Albineli Motta
11个回答

145

这是一个Python的bug。 在使用threading.Condition.wait()等待条件时,永远不会发送KeyboardInterrupt。 复现:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

当调用wait()方法时,KeyboardInterrupt异常只会在wait()方法返回时才会被触发,但由于wait()永远不会返回,因此中断永远不会发生。通常情况下,KeyboardInterrupt应该可以中断条件等待。

需要注意的是,如果指定了超时时间,则不会出现这种情况;cond.wait(1)将立即接收中断。因此,一种解决方法是指定超时时间。要做到这一点,请替换

    results = pool.map(slowly_square, range(40))

使用

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似。


4
这个 bug 在官方 Python 追踪器上有吗?我很难找到它,可能只是我的搜索词不够准确。 - Joseph Garvin
18
这个 bug 已经被记录在 Issue 8296 - Andrey Vlasovskikh
8
这并没有完全解决问题。有时候我按Control+C会得到预期的行为,但有时候不会。我不确定原因是什么,但看起来可能是随机地由其中一个进程接收了KeyboardInterrupt信号,只有当父进程捕获它时我才能得到正确的行为。 - Ryan C. Thompson
12
在Windows平台上,我使用Python 3.6.1时无法成功运行。当我按Ctrl-C时,会出现大量的堆栈跟踪和其他垃圾信息,与没有此类解决方案时相同。事实上,我已尝试了该线程中提供的所有解决方案,但似乎都不起作用... - szx
4
2019年时依然存在问题,像同时进行IO这样的并行操作还是一个新颖的想法 :/ - Akos Lukacs
显示剩余6条评论

67

根据我最近的发现,最佳解决方案是设置工作进程完全忽略SIGINT,并将所有清理代码限制在父进程中。这可以为空闲和繁忙的工作进程都解决问题,并且在子进程中不需要任何错误处理代码。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

可以在以下链接找到解释和完整示例代码:http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt


5
嗨,John。你的解决方案没有达到和我的解决方案相同的效果,是的,我承认我的方案有些复杂。你的方案在主进程中使用 time.sleep(10) 来掩盖问题。如果你删除这个 sleep,或者等待进程尝试加入(join)池,为了确保作业已完成,那么你仍然会遇到同样的问题,即主进程无法在等待池中的 join 操作期间接收到 KeyboardInterrupt。 - bboe
2
那么这更像是一个繁忙的等待(可能在检查之间有小睡眠),通过另一种方法轮询进程完成情况,而不是使用join?如果是这样的话,也许最好将此代码包含在您的博客文章中,因为您可以确保在尝试加入之前所有工作人员都已完成。 - bboe
5
这个不起作用。只有子进程收到信号。父进程从未接收到信号,所以 pool.terminate() 没有被执行。让子进程忽略信号没有任何作用。@Glenn的答案解决了这个问题。 - Cerin
2
我的版本在 https://gist.github.com/admackin/003dd646e5fadee8b8d6 上;它不会在除了中断以外的情况下调用 .join(),而是手动检查 .apply_async() 的结果,使用 AsyncResult.ready() 来判断是否准备就绪,这意味着我们已经干净地完成了。 - Andy MacKinlay
@Cerin 我试图确认这个解决方案在某些地方会出现问题,然后我发现了这个链接 https://www.win.tue.nl/~aeb/linux/lk/lk-10.html#ss10.2。我相信如果信号被发送到进程组,那么信号将被发送到领导者和子进程。如果是这样的话,那么除了领导者以外忽略信号将会是一个非常好的解决方案。 - trcarden
显示剩余3条评论

33

由于某些原因,只有继承自基类Exception的异常才能被正常处理。作为解决方法,您可以将KeyboardInterrupt重新抛出为一个Exception实例:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常情况下,您将获得以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此,如果您按下^C,您将会得到:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

2
看起来这不是一个完整的解决方案。如果在 multiprocessing 执行自己的 IPC 数据交换时到达 KeyboardInterrupt ,那么 try..catch 将不会被激活(显然)。 - Andrey Vlasovskikh
你可以用 return 替换 raise KeyboardInterruptError。你只需要确保子进程在接收到 KeyboardInterrupt 后尽快结束。返回值似乎被忽略了,在 main 中仍然会接收到 KeyboardInterrupt。 - Bernhard

18

所投票的答案未解决核心问题,而是类似的副作用。

Jesse Noller,多进程库的作者,在一篇旧的博客文章中讲解了如何在使用multiprocessing.Pool时正确处理CTRL+C。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

我发现ProcessPoolExecutor也有同样的问题。我能找到的唯一解决方法是从future内部调用os.setpgrp() - benathon
1
当然,唯一的区别是ProcessPoolExecutor不支持初始化函数。在Unix上,您可以通过在创建池之前在主进程上禁用sighandler并在之后重新启用它来利用fork策略。在pebble中,默认情况下我会在子进程上静音SIGINT。我不知道他们为什么不对Python Pools执行相同的操作。最后,用户可以重新设置SIGINT处理程序,以防他/她想要伤害自己。 - noxdafox
2
这个解决方案似乎可以防止Ctrl-C中断主进程。 - Paul Price
1
我刚在Python 3.5上测试过,它可以正常工作。你使用的是哪个版本的Python?你用的是什么操作系统? - noxdafox

17

很多答案要么过时,要么似乎不适用于在Windows上执行像Pool.map这样的阻塞方法(我正在运行3.8.5版本的Python)。以下是我的解决方案:

  1. 在主进程中发出调用signal.signal(signal.SIGINT, signal.SIG_IGN)以完全忽略Ctrl-C。
  2. 处理池将使用一个池初始化器来初始化每个处理器:全局变量ctrl_c_entered将设置为False,并且将发出一个调用signal.signal(signal.SIGINT, signal.SIG_IGN)来最初忽略Ctrl-C。此调用的返回值将被保存;这是重新建立默认处理程序的原始处理程序,当重新建立时允许处理KyboardInterrupt异常。
  3. 可以使用装饰器handle_ctrl_c来装饰多进程函数和方法,并应在输入Ctrl-C时立即退出。该装饰器将测试全局ctrl_c_entered标志是否已设置,如果是,则不会运行函数/方法,而是将返回一个KeyboardInterrupt异常实例。否则,将建立一个try/catch处理程序来处理KeyboardInterrupt,并调用装饰的函数/方法。如果输入了Ctrl-C,则全局ctrl_c_entered将设置为True,并返回一个KeyboardInterrupt异常实例。无论如何,在返回装饰器之前,都将重新建立SIG_IGN处理程序。

实质上,所有提交的任务将被允许启动,但一旦输入了Ctrl-C,它们将立即终止并返回一个KeyBoardInterrupt异常的返回值。主进程可以测试返回值是否存在这样的返回值来检测是否输入了Ctrl-C。

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(10))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

打印:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]

确认在Windows上的Python 3.7.7上按预期工作。感谢您的发布! - Bruce Lamond
如上所示,对于列表中的所有未处理成员,它会返回KeyboardInterrupt(),这将被解读为True并且不会打印任何内容。我发现将其替换为返回None更加有用。 - Jonathan Rys
@JonathanRys 如果被使用的工作函数(在我的例子中是slowly_square)返回None,那该怎么办呢?在这种情况下,能够区分工作函数正常完成和由于之前的键盘中断而未运行的情况对于函数main来说更有用。当发生键盘中断时,handle_ctrl_c返回什么并不重要,只要它能与工作函数可能返回的任何可能值区分开即可。也许这应该是一个特别定义的空类的实例,例如UnProcessed - Booboo

10

1
这也必须在每个工作进程上完成,如果在初始化多进程库时发生KeyboardInterrupt,则可能仍会失败。 - MarioVilas

5
似乎在多进程处理中出现了两个让异常情况变得烦人的问题。首先(由 Glenn 指出),为了获得立即响应(即不用完成整个列表的处理),您需要使用带有超时的 map_async,而不是 map。其次(由 Andrey 指出),multiprocessing 无法捕获不继承自 Exception 的异常(例如 SystemExit)。因此,这就是我的解决方案,可以同时解决这两个问题:
import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    https://dev59.com/UnM_5IYBdhLWcg3wXx5N

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

1
我没有注意到任何性能损失,但在我的情况下,这个“函数”相当长寿(数百秒)。 - Paul Price
这实际上不再是情况了,至少从我的角度和经验来看。如果在各个子进程中捕获键盘异常,并在主进程中再次捕获它,则可以继续使用map,一切都很好。 @Linux Cli Aik 在下面提供了一个解决方案,可以产生这种行为。如果主线程依赖于子进程的结果,则不总是希望使用map_async - Code Doggo

5

我是Python的新手。我到处寻找答案,偶然发现了这篇和其他一些博客和Youtube视频。我试图复制粘贴作者上面的代码,并在我的Windows 7 64位Python 2.7.13上重现它。它接近我想要实现的。

我让我的子进程忽略ControlC并使父进程终止。看起来绕过子进程确实避免了这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

pool.terminate() 开始的部分似乎从未被执行。


我也刚刚弄明白这个问题!我真的认为这是解决这种问题的最佳方案。被接受的解决方案强制用户使用map_async,而我并不特别喜欢这样做。在许多情况下,就像我的情况一样,主线程需要等待各个进程完成。这就是为什么map存在的原因之一! - Code Doggo

4
您可以尝试使用Pool对象的apply_async方法,就像这样:
import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

输出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

这种方法的优点是,在中断发生前处理的结果将会在结果字典中返回:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

1
辉煌而完整的例子 - eMTy
1
优秀的例子。 - michaelvdnest
谢谢。我正在尝试弄清楚如何将其推广到多个参数。特别是,为什么在 jobs[value] = pool.apply_async(input_function, [value]) 中传递 [value] 而不是 value - amball
1
有没有可能让被中断的进程返回一个中间结果呢? - 2080

4

像魔法一样好用。这是一个干净的解决方案,不是某种黑客行为。顺便说一下,其他人提出的使用 .get(99999) 的技巧会严重损害性能。 - Walter
我没有注意到使用超时会有任何性能惩罚,尽管我一直使用的是9999而不是999999。唯一的例外是当引发一个不继承自Exception类的异常时:那么你必须等待超时时间到达。解决这个问题的方法是捕获所有异常(请参见我的解决方案)。 - Paul Price

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接