函数调用超时

510

我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。

我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?


5
这个库看起来被维护得很好:https://pypi.org/project/wrapt-timeout-decorator/ - guettli
24个回答

339

如果您正在运行UNIX系统,可以使用signal软件包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

在调用 signal.alarm(10) 后的 10 秒钟,处理程序将被调用。这会引发一个异常,您可以从常规 Python 代码中拦截它。

该模块与线程不兼容(但是,谁兼容呢?)

请注意,由于我们在超时发生时引发异常,因此它可能会被函数内部捕获并忽略,例如以下函数之一:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

5
我使用Python 2.5.4版本时出现了以下错误:Traceback (most recent call last): File "aa.py", line 85, in func signal.signal(signal.SIGALRM, handler) AttributeError: 'module' object has no attribute 'SIGALRM'(这个错误可能是由于)模块中没有'SIGALRM'属性导致的。 - flypen
19
那是因为在Windows平台上,signal.alarm和相关的SIGALRM不可用。 - Double AA
4
如果有很多进程,每个进程都调用signal.signal——它们会正常工作吗?每个signal.signal调用不会取消"并发"的一个吗? - brownian
24
我同意有关多线程的警告。signal.alarm仅适用于主线程。我曾尝试在Django视图中使用它,但立即失败,并出现关于仅适用于主线程的措辞。 - JL Peyret
2
如果你需要取消闹钟,将其设置为0:signal.alarm(0)(参见https://dev59.com/BIXca4cB1Zd3GeqPGlEp)。 - Michele Piccolini
显示剩余9条评论

234
你可以使用 multiprocessing.Process 来实现这一点。

代码

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

72
如何获取目标方法的返回值? - bad_keypoints
9
如果被调用的函数陷入I/O阻塞,这似乎无法正常工作。 - sudo
4
@bad_keypoints 参见此答案:https://dev59.com/Amkv5IYBdhLWcg3wsC7i#10415215 基本上,您需要将答案放入列表中并传递它。 - Peter
1
@sudo 然后删除join()。这样可以使你的x个并发子进程一直运行,直到它们完成工作,或者在join(10)中定义的数量。如果你有10个进程的阻塞I/O,在使用join(10)时,你已经设置它们等待所有已启动的进程的最大值为10。像这个例子一样使用daemon标志http://stackoverflow.com/a/27420072/2480481。当然,你也可以直接将标志`daemon=True`传递给`multiprocessing.Process()`函数。 - m3nda
4
@ATOzTOA 这种解决方案的问题,至少对于我的目的来说,是它可能不允许子进程自己清理。根据终止函数 terminate() ... 注意,退出处理程序和 finally 子句等将不会执行。请注意,进程的后代进程将不会被终止 - 它们只会变成孤儿进程。 - abalcerek
显示剩余13条评论

118

如何调用该函数或包装它,使得如果它的执行时间超过5秒,脚本将取消它?

我发布了一个使用装饰器和 threading.Timer 解决此问题的代码片段。以下是具体细节。

导入模块和设置兼容性

这段代码经过 Python 2 和 3 的测试,并应该在 Unix/Linux 和 Windows 平台下都能工作。

首先是导入所需模块。为保持代码一致性,我们使用了以下语句:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用版本无关的代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在我们已经从标准库中导入了我们的功能。

exit_after 装饰器

接下来,我们需要一个函数来终止子线程中的 main()

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

这里是装饰器本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

使用方法

这里是直接回答您关于在5秒后退出的问题的用法:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用不会完成,而是应该以回溯方式退出进程!

KeyboardInterrupt 并不总是能停止正在休眠的线程

请注意,在 Windows 上的 Python 2 中,sleep 并不总是会被键盘中断所中断:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

除非显式检查PyErr_CheckSignals(),否则它不太可能中断运行在扩展中的代码。请参阅Cython、Python和KeyboardInterrupt被忽略

无论如何,我都会避免让线程睡眠超过一秒钟 - 在处理器时间中这是一个时代。

如果函数执行时间超过5秒,我该如何调用该函数或将其包装起来以使脚本取消执行并执行其他操作?

要捕获并执行其他操作,可以捕获KeyboardInterrupt

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

3
为什么我必须调用 thread.interrupt_main(),不能直接引发异常? - Anirban Nag 'tintinmj'
1
有没有想过用multiprocessing.connection.Client来封装它?- 尝试解决:https://dev59.com/HLbna4cB1Zd3GeqPWSiC - wwii
1
当我尝试使用不同的函数而不是倒计时时,它会卡在thread.interrupt_main()上。例如,在计数器内部运行一个subprocess(),即使计时器已完成,它也没有终止,我不得不按下^C - alper
1
如何停止所有进程但不引发KeyboardInterrupt错误? - WJA
如果这样做,那就毫无意义了——我们的装饰器会出问题。 - Russia Must Remove Putin
显示剩余3条评论

68

我有一个不同的建议,它是一个纯函数(具有与线程建议相同的API),并且似乎运行良好(基于这个帖子上的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

3
您还应该恢复原始的信号处理程序。请参见https://dev59.com/f3RB5IYBdhLWcg3w1Kv0#494273#comment8635219_494273。 - Martin Konecny
10
注意:Unix信号方法只在主线程中应用有效,如果在子线程中应用会抛出异常且不起作用。 - Martin Konecny
13
这不是最佳解决方案,因为它只适用于Linux系统。 - max
30
不准确,它适用于任何符合POSIX标准的Unix系统。我认为你的评论应更准确地表达为:不适用于Windows系统。 - Chris Johnson
16
应避免将kwargs设置为空字典。在函数的默认参数上,Python的一个常见坑点是它们是可变的。这意味着该字典将在所有对timeout的调用之间共享。更好的做法是将默认值设置为None,并在函数的第一行添加kwargs = kwargs or {}。Args可以使用,因为元组是不可变的。 - scottmrogowski
显示剩余2条评论

43

我在搜索单元测试的超时调用时发现了这个帖子。在回答或第三方包中没有找到任何简单的东西,因此我编写了下面的装饰器,您可以直接将其放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

要超时测试或其他你喜欢的函数只需要简单地这样做:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

23
请注意,这将不会在超时后终止函数的执行! - Sylvain
1
请注意,在Windows上,这将产生一个全新的进程 - 如果依赖项设置需要很长时间,则会大大减少超时时间。 - Russia Must Remove Putin
2
是的,这需要一些调整。它会让线程永远运行下去。 - sudo
4
我不知道这是否是最好的方法,但你可以在func_wrapper函数内部尝试使用try/catch捕获异常,并在catch后执行pool.close()以确保线程无论如何都会停止。之后你可以抛出TimeoutError或其他你想要的异常。对我来说似乎行得通。 - sudo
2
这很有用,但是一旦我重复多次后,就会出现“RuntimeError:无法启动新线程”。如果我忽略它,它仍然有效吗?还是有其他方法可以解决这个问题?提前致谢! - 56-
显示剩余2条评论

37

发现在pypi上有一个名为stopit的包,似乎很好地处理了超时。

我喜欢@stopit.threading_timeoutable装饰器,它添加了一个timeout参数到被装饰的函数中,它做你所期望的事情,停止函数的执行。

点击链接查看: https://pypi.python.org/pypi/stopit


库声称,在Windows中某些功能无法正常工作。 - Stefan Simik
2
对于可能像我一样会感到困惑的人: stopit.utils.TimeoutException 不会停止你的代码!代码在此之后会正常运行!我在一个正常运行的程序里浪费了30分钟。 非常好的答案! - Charalamm
使用stopit-1.1.2,基本的超时装饰器:@stopit.threading_timeoutable(default='not finished')在Linux和Windows上都能正常工作。如果您只需要一个简单的超时,那么这是一个简单而优秀的解决方案。 - Bence Kaulics

27

我是 wrapt_timeout_decorator 的作者。

在 Linux 下,很多解决方案看起来都能正常工作,因为我们有 fork()signals()。但在 Windows 上,情况会有所不同。而在 Linux 上的子线程中,你不能再使用信号量。

为了在 Windows 下产生一个进程,它需要是可 pickle 的,而很多包装过的函数或类方法则不是可 pickle 的。

所以你需要使用更好的 pickler,例如 dill 和 multiprocess(而非 pickle 和 multiprocessing)——这就是为什么你不能使用 ProcessPoolExecutor(或者只能使用受限的功能)。

对于超时本身,你需要定义超时的含义——因为在 Windows 上,启动进程将需要相当长的时间(且无法确定)。这在短时间内会比较棘手。假设启动进程大约需要 0.5 秒钟(非常容易!)。如果你给出了 0.2 秒的超时时间,应该发生什么?

  • 函数是否应该在 0.5 + 0.2 秒后超时(也就是让方法运行 0.2 秒)?
  • 还是调用的进程在 0.2 秒后超时(在这种情况下,封装的函数将始终超时,因为此时它甚至还没有启动)?

同时嵌套装饰器可能会很棘手,而且您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,需要考虑所有这些问题(并进行测试)。

其他问题包括将异常传递回调用者,以及日志记录问题(如果在装饰函数中使用-在另一个进程中记录到文件是不受支持的)。

我试图涵盖所有边缘情况,您可以查看wrapt_timeout_decorator软件包,或者至少测试受其单元测试启发的您自己的解决方案。

@ Alexis Eggermont-不幸的是,我没有足够的积分发表评论-也许其他人可以通知您-我认为我解决了您的导入问题。


1
这对我来说真是救命稻草!我的问题是有时多进程工作器会无缘无故地停滞不前,并在睡眠状态下消耗大量内存和CPU。尝试了各种多进程包装器,其中有一个池超时选项,但每个都给我带来了其他不同的问题,比如在池终止后未能杀死进程。现在有了这个装饰器,只需长时间超时后,函数就会被杀死并在其中生成的进程。它会给我一个BrokenPipeError,表示突然关闭池,但解决了我的主要问题。谢谢!有没有处理BrokenPipeError的建议? - Arjun Sankarlal
2
@Arjun Sankarlal:当然,如果工作进程被终止,管道就会中断。您需要在调度程序任务中捕获中断的管道错误并进行适当的清理。 - bitranox
1
是的,我明白了,并且我在try/except中使用了BrokenPipeError,但它没有被捕获。所以我正在一个Web服务器中使用它。我有一个捕获BrokenPipeError和一般异常的方法。所以当超时发生时,我返回的是一般异常而不是断开的管道错误。但是几秒钟后,服务器在控制台上打印出BrokenPipeError,并且它可以无问题地处理其他请求。也许我可以在之后引入一个延迟来检查池是否已经断开,然后再返回!? - Arjun Sankarlal
感谢您的库。在我的情况下,这是最佳解决方案。 - jonsbox

24

有很多建议,但没有使用concurrent.futures的,我认为这是处理此问题最易读的方法。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

易于阅读和维护。

我们创建一个池,提交单个进程,然后等待最多5秒钟,然后引发TimeoutError,您可以捕获并根据需要处理它。

原生支持Python 3.2+,并已回溯到2.7(使用pip install futures安装)。

在线程和进程之间切换就像用ProcessPoolExecutor替换为ThreadPoolExecutor一样简单。

如果您想在超时时终止进程,建议查看Pebble


3
“警告:如果超时,此功能将不会终止”是什么意思? - Scott Stafford
6
即使引发TimeoutError,进程/线程也不会立即结束。因此,进程或线程仍会尝试运行到完成,并且不会在超时时自动将控制权返回给您。 - Brian
这样做可以让我在此时保存任何中间结果吗?例如,如果我有一个递归函数,我将超时设置为5,在那段时间内我有部分结果,那么我如何编写函数以在超时时返回部分结果呢? - SumNeuron
我正在使用这个,但是我有1000个任务,每个任务允许5秒超时。我的问题是,核心会在永远不结束的任务上堵塞,因为超时仅应用于所有任务的总和而不是单个任务。据我所知,concurrent.futures没有提供解决方案。 - Bastiaan

21

在 @piro 的答案基础上进行改进,您可以构建一个上下文管理器。这将允许非常易读的代码,在成功运行后禁用警报信号(设置 signal.alarm(0))。

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise TimeoutError(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    try:
        yield
    finally:
        signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

使用示例:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds

1
这确实是一个很好的方法。为了完整起见,需要导入以下内容才能使其工作:from contextlib import contextmanager - mdev
3
这个上下文管理器的当前实现存在一个问题,即在上下文内部代码块中出现异常可能导致信号警报未被禁用。为了解决这个问题,应该添加try + finally语句块,类似于下面我的超时功能修饰器(https://dev59.com/f3RB5IYBdhLWcg3w1Kv0#66515961)。 - mdev
这种方法似乎不太可靠。当我运行非常计算密集的代码时,超时似乎永远不会发生。但是,当我在调试器中使用断点暂停该进程时,最终会超时。 - ShnitzelKiller
@ShnitzelKiller 来自 signal 文档:在 C 中实现的长时间运行的计算(例如在大量文本上进行正则表达式匹配)可能会在接收到任何信号时无限期地不间断运行。当计算完成时,Python 信号处理程序将被调用。 - alex

18

很棒,易于使用且可靠的PyPi项目timeout-decorator (https://pypi.org/project/timeout-decorator/)

安装

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

6
我感谢清晰的解决方案。但是,有没有人可以解释一下这个库是如何工作的,特别是在处理多线程时。个人而言,我害怕使用未知的机制来处理线程或信号。 - wsysuper
@wsysuper 这个库有两种操作模式:打开新线程或者新的子进程(应该是线程安全的)。 - Gil
似乎它在Linux下不起作用,就像其他基于signal.SIGALRM的解决方案一样。 - Mathieu Roger
1
这个解决方案在Python 3.7.6上不起作用。我想你应该知道!这对我来说太糟糕了。 - Andre Carneiro
@Gil 我如何在FastAPI中使用它? - Aadhi Verma

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接