函数调用超时

510

我在调用Python中的一个函数,但我知道这个函数可能会阻塞并迫使我重新启动脚本。

我该如何调用这个函数或者将其包装起来,以便如果它运行时间超过5秒钟,脚本就能取消它并执行其他操作?


5
这个库看起来被维护得很好:https://pypi.org/project/wrapt-timeout-decorator/ - guettli
24个回答

0

如果工作没有完成,我打算使用线程和进程来终止该进程。

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

作为上下文管理器的实际类

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

如何使用它

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

0

这里对给定的基于线程的解决方案进行了轻微改进。

以下代码支持异常处理

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

使用5秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

1
这将引发一个新的异常,隐藏原始的回溯跟踪。请参见下面的版本... - Meitham
1
这也是不安全的,因为如果在runFunctionCatchExceptions()中调用某些获取GIL的Python函数。例如,如果在该函数中调用以下函数,则永远不会返回或者需要很长时间才能返回:eval(2**9999999999**9999999999)。请参见https://dev59.com/SH3aa4cB1Zd3GeqPZCRf。 - Mikko Ohtamaa

0

这里是一个 POSIX 版本,结合了之前的许多答案,提供以下功能:

  1. 子进程阻塞执行。
  2. 在类成员函数上使用超时函数。
  3. 对终止时间有严格要求。

这是代码和一些测试用例:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

0

这里有一个简单易用的装饰器,如果函数执行时间超时,则返回给定的默认值,灵感来自于这个问题的第一个答案

import signal
from functools import wraps
import time

def timeout(seconds, default=None):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            def signal_handler(signum, frame):
                raise TimeoutError("Timed out!")
            # Set up the signal handler for timeout
            signal.signal(signal.SIGALRM, signal_handler)

            # Set the initial alarm for the integer part of seconds
            signal.setitimer(signal.ITIMER_REAL, seconds)

            
            try:
                result = func(*args, **kwargs)
            except TimeoutError:
                return default
            finally:
                signal.alarm(0)
            
            return result
        
        return wrapper
    
    return decorator

@timeout(0.2, default="Timeout!")
def long_function_call(meal):
    time.sleep(3)
    return f"I have executed fully, {meal} is ready"

@timeout(1.3, default="Timeout!")
def less_long_function_call(meal):
    time.sleep(1)
    return f"I have executed fully, {meal} is ready"

result = long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 0.2 seconds
result = less_long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 1.3 seconds

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接