我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。
我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?
我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。
我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?
如果工作没有完成,我打算使用线程和进程来终止该进程。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用它
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)
这里对给定的基于线程的解决方案进行了轻微改进。
以下代码支持异常处理:
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
使用5秒超时调用它:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
runFunctionCatchExceptions()
中调用某些获取GIL的Python函数。例如,如果在该函数中调用以下函数,则永远不会返回或者需要很长时间才能返回:eval(2**9999999999**9999999999)
。请参见https://dev59.com/SH3aa4cB1Zd3GeqPZCRf。 - Mikko Ohtamaa这里是一个 POSIX 版本,结合了之前的许多答案,提供以下功能:
这是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
这里有一个简单易用的装饰器,如果函数执行时间超时,则返回给定的默认值,灵感来自于这个问题的第一个答案:
import signal
from functools import wraps
import time
def timeout(seconds, default=None):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
def signal_handler(signum, frame):
raise TimeoutError("Timed out!")
# Set up the signal handler for timeout
signal.signal(signal.SIGALRM, signal_handler)
# Set the initial alarm for the integer part of seconds
signal.setitimer(signal.ITIMER_REAL, seconds)
try:
result = func(*args, **kwargs)
except TimeoutError:
return default
finally:
signal.alarm(0)
return result
return wrapper
return decorator
@timeout(0.2, default="Timeout!")
def long_function_call(meal):
time.sleep(3)
return f"I have executed fully, {meal} is ready"
@timeout(1.3, default="Timeout!")
def less_long_function_call(meal):
time.sleep(1)
return f"I have executed fully, {meal} is ready"
result = long_function_call("bacon")
print(result) # Prints "Timeout!" if the function execution exceeds 0.2 seconds
result = less_long_function_call("bacon")
print(result) # Prints "Timeout!" if the function execution exceeds 1.3 seconds