我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。
我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?
我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。
我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?
如果您正在运行UNIX系统,可以使用signal软件包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
在调用 signal.alarm(10)
后的 10 秒钟,处理程序将被调用。这会引发一个异常,您可以从常规 Python 代码中拦截它。
该模块与线程不兼容(但是,谁兼容呢?)
请注意,由于我们在超时发生时引发异常,因此它可能会被函数内部捕获并忽略,例如以下函数之一:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
signal.alarm
和相关的SIGALRM
不可用。 - Double AAsignal.signal
——它们会正常工作吗?每个signal.signal
调用不会取消"并发"的一个吗? - browniansignal.alarm(0)
(参见https://dev59.com/BIXca4cB1Zd3GeqPGlEp)。 - Michele Piccolinimultiprocessing.Process
来实现这一点。
代码
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
join()
。这样可以使你的x个并发子进程一直运行,直到它们完成工作,或者在join(10)
中定义的数量。如果你有10个进程的阻塞I/O,在使用join(10)
时,你已经设置它们等待所有已启动的进程的最大值为10。像这个例子一样使用daemon标志http://stackoverflow.com/a/27420072/2480481。当然,你也可以直接将标志`daemon=True`传递给`multiprocessing.Process()`函数。 - m3ndaterminate() ... 注意,退出处理程序和 finally 子句等将不会执行。请注意,进程的后代进程将不会被终止 - 它们只会变成孤儿进程。
- abalcerek我发布了一个使用装饰器和 threading.Timer
解决此问题的代码片段。以下是具体细节。
这段代码经过 Python 2 和 3 的测试,并应该在 Unix/Linux 和 Windows 平台下都能工作。
首先是导入所需模块。为保持代码一致性,我们使用了以下语句:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
使用版本无关的代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
现在我们已经从标准库中导入了我们的功能。
exit_after
装饰器接下来,我们需要一个函数来终止子线程中的 main()
:
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
这里是装饰器本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
这里是直接回答您关于在5秒后退出的问题的用法:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用不会完成,而是应该以回溯方式退出进程!
KeyboardInterrupt
并不总是能停止正在休眠的线程请注意,在 Windows 上的 Python 2 中,sleep
并不总是会被键盘中断所中断:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
除非显式检查PyErr_CheckSignals()
,否则它不太可能中断运行在扩展中的代码。请参阅Cython、Python和KeyboardInterrupt被忽略
无论如何,我都会避免让线程睡眠超过一秒钟 - 在处理器时间中这是一个时代。
如果函数执行时间超过5秒,我该如何调用该函数或将其包装起来以使脚本取消执行并执行其他操作?
要捕获并执行其他操作,可以捕获KeyboardInterrupt
。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
thread.interrupt_main()
,不能直接引发异常? - Anirban Nag 'tintinmj'multiprocessing.connection.Client
来封装它?- 尝试解决:https://dev59.com/HLbna4cB1Zd3GeqPWSiC - wwiithread.interrupt_main()
上。例如,在计数器内部运行一个subprocess()
,即使计时器已完成,它也没有终止,我不得不按下^C
。 - alper我有一个不同的建议,它是一个纯函数(具有与线程建议相同的API),并且似乎运行良好(基于这个帖子上的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
timeout
的调用之间共享。更好的做法是将默认值设置为None
,并在函数的第一行添加kwargs = kwargs or {}
。Args可以使用,因为元组是不可变的。 - scottmrogowski我在搜索单元测试的超时调用时发现了这个帖子。在回答或第三方包中没有找到任何简单的东西,因此我编写了下面的装饰器,您可以直接将其放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
要超时测试或其他你喜欢的函数只需要简单地这样做:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
pool.close()
以确保线程无论如何都会停止。之后你可以抛出TimeoutError
或其他你想要的异常。对我来说似乎行得通。 - sudo发现在pypi上有一个名为stopit
的包,似乎很好地处理了超时。
我喜欢@stopit.threading_timeoutable
装饰器,它添加了一个timeout
参数到被装饰的函数中,它做你所期望的事情,停止函数的执行。
stopit.utils.TimeoutException
不会停止你的代码!代码在此之后会正常运行!我在一个正常运行的程序里浪费了30分钟。
非常好的答案! - Charalamm@stopit.threading_timeoutable(default='not finished')
在Linux和Windows上都能正常工作。如果您只需要一个简单的超时,那么这是一个简单而优秀的解决方案。 - Bence Kaulics我是 wrapt_timeout_decorator 的作者。
在 Linux 下,很多解决方案看起来都能正常工作,因为我们有 fork()
和 signals()
。但在 Windows 上,情况会有所不同。而在 Linux 上的子线程中,你不能再使用信号量。
为了在 Windows 下产生一个进程,它需要是可 pickle 的,而很多包装过的函数或类方法则不是可 pickle 的。
所以你需要使用更好的 pickler,例如 dill 和 multiprocess(而非 pickle 和 multiprocessing)——这就是为什么你不能使用 ProcessPoolExecutor
(或者只能使用受限的功能)。
对于超时本身,你需要定义超时的含义——因为在 Windows 上,启动进程将需要相当长的时间(且无法确定)。这在短时间内会比较棘手。假设启动进程大约需要 0.5 秒钟(非常容易!)。如果你给出了 0.2 秒的超时时间,应该发生什么?
同时嵌套装饰器可能会很棘手,而且您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,需要考虑所有这些问题(并进行测试)。
其他问题包括将异常传递回调用者,以及日志记录问题(如果在装饰函数中使用-在另一个进程中记录到文件是不受支持的)。
我试图涵盖所有边缘情况,您可以查看wrapt_timeout_decorator软件包,或者至少测试受其单元测试启发的您自己的解决方案。
@ Alexis Eggermont-不幸的是,我没有足够的积分发表评论-也许其他人可以通知您-我认为我解决了您的导入问题。
有很多建议,但没有使用concurrent.futures的,我认为这是处理此问题最易读的方法。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
易于阅读和维护。
我们创建一个池,提交单个进程,然后等待最多5秒钟,然后引发TimeoutError,您可以捕获并根据需要处理它。
原生支持Python 3.2+,并已回溯到2.7(使用pip install futures安装)。
在线程和进程之间切换就像用ProcessPoolExecutor
替换为ThreadPoolExecutor
一样简单。
如果您想在超时时终止进程,建议查看Pebble。
在 @piro 的答案基础上进行改进,您可以构建一个上下文管理器。这将允许非常易读的代码,在成功运行后禁用警报信号(设置 signal.alarm(0))。
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise TimeoutError(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
使用示例:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
from contextlib import contextmanager
。 - mdevtry
+ finally
语句块,类似于下面我的超时功能修饰器(https://dev59.com/f3RB5IYBdhLWcg3w1Kv0#66515961)。 - mdevsignal
文档:在 C 中实现的长时间运行的计算(例如在大量文本上进行正则表达式匹配)可能会在接收到任何信号时无限期地不间断运行。当计算完成时,Python 信号处理程序将被调用。 - alex很棒,易于使用且可靠的PyPi项目timeout-decorator (https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()