Python线程超时上下文管理器

12
我有一个上下文管理器 timeout,在信号方面完美运作,但在多线程模式下会引发错误,因为信号只在主线程中工作。
def timeout_handler(signum, frame):
    raise TimeoutException()

@contextmanager
def timeout(seconds):
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

我看到了 timeout 的装饰器实现,但我不知道如何将 yield 传递给继承自 threading.Thread 的类。我的方法行不通。
@contextmanager
def timelimit(seconds):
    class FuncThread(threading.Thread):
        def run(self):
            yield

    it = FuncThread()        
    it.start()
    it.join(seconds)

    if it.isAlive():
        raise TimeoutException()
5个回答

16
如果被上下文管理器保护的代码是基于循环的,请考虑处理方式与人们处理线程终止的方式相同。终止另一个线程通常是不安全的,因此标准方法是使控制线程设置一个对工作线程可见的标志。工作线程定期检查该标志并清理地关闭自身。以下是使用超时进行类似操作的方法:
class timeout(object):
    def __init__(self, seconds):
        self.seconds = seconds
    def __enter__(self):
        self.die_after = time.time() + self.seconds
        return self
    def __exit__(self, type, value, traceback):
        pass
    @property
    def timed_out(self):
        return time.time() > self.die_after

这是一个单线程使用示例:

with timeout(1) as t:
    while True: # this will take a long time without a timeout
        # periodically check for timeouts
        if t.timed_out:
            break # or raise an exception
        # do some "useful" work
        print "."
        time.sleep(0.2)

和一个多线程的:

import thread
def print_for_n_secs(string, seconds):
    with timeout(seconds) as t:
        while True:
            if t.timed_out:
                break # or raise an exception
            print string,
            time.sleep(0.5)

for i in xrange(5):
    thread.start_new_thread(print_for_n_secs,
                            ('thread%d' % (i,), 2))
    time.sleep(0.25)

这种方法比使用信号更加侵入,但它适用于任意线程。


这是一种可能的方法,但不如我所希望的那样简短和清晰。你的变体需要像装饰器一样将代码包装起来,但这对我来说是一种新的方法,所以我给了你赏金。谢谢。 - San4ez

3

我看不出使用上下文管理器能够实现您提出的需求,因为您不能将流从一个线程中yield给另一个线程。我会使用带有超时的可中断线程来包装您的函数。这里是一个示例代码

您将会有一个额外的线程,语法可能不如意,但它可以正常工作。


2
请注意,该配方描述的可中断线程实际上并没有被中断,而是继续运行。据我所知,没有可靠的方法来中断非主要Python线程。 - Lethargy

1
我知道现在已经很晚了,但我刚刚才看到这个问题。不过,你考虑过创建自己的信号器/上下文管理器吗?我是Python新手,希望有经验的开发人员能对这个实现提供反馈。
这是基于“Mr Fooz”的答案。
class TimeoutSignaller(Thread):
    def __init__(self, limit, handler):
        Thread.__init__(self)
        self.limit = limit
        self.running = True
        self.handler = handler
        assert callable(handler), "Timeout Handler needs to be a method"

    def run(self):
        timeout_limit = datetime.datetime.now() + datetime.timedelta(seconds=self.limit)
        while self.running:
            if datetime.datetime.now() >= timeout_limit:
                self.handler()
                self.stop_run()
                break

    def stop_run(self):
        self.running = False

class ProcessContextManager:
    def __init__(self, process, seconds=0, minutes=0, hours=0):
        self.seconds = (hours * 3600) + (minutes * 60) + seconds
        self.process = process
        self.signal = TimeoutSignaller(self.seconds, self.signal_handler)

    def __enter__(self):
        self.signal.start()
        return self.process

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.signal.stop_run()

    def signal_handler(self):
        # Make process terminate however you like
        # using self.process reference
        raise TimeoutError("Process took too long to execute")

使用情况:
with ProcessContextManager(my_proc) as p:
    # do stuff e.g.
    p.execute()

-1

与Mr Fooz类似的实现,但使用contextlib库:

import time
from contextlib import contextmanager

@contextmanager
def timeout(seconds):
    """
    A simple context manager to enable timeouts.

    Example:

        with timeout(5) as t:
            while True:
                if t():
                    # handle
    """
    stop = time.time() + seconds
    def timed_out():
        return time.time() > stop

    yield timed_out

-3

系统调用的超时是通过信号完成的。当发生信号时,大多数阻塞系统调用都会返回EINTR,因此您可以使用alarm来实现超时。

这里有一个上下文管理器,适用于大多数系统调用,如果阻塞系统调用时间过长,则会引发IOError异常。

import signal, errno
from contextlib import contextmanager
import fcntl

@contextmanager
def timeout(seconds):
    def timeout_handler(signum, frame):
        pass

    original_handler = signal.signal(signal.SIGALRM, timeout_handler)

    try:
        signal.alarm(seconds)
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, original_handler)

with timeout(1):
    f = open("test.lck", "w")
    try:
        fcntl.flock(f.fileno(), fcntl.LOCK_EX)
    except IOError, e:
        if e.errno != errno.EINTR:
            raise e
        print "Lock timed out"

就像我的第一个变量一样,在original_handler = signal.signal(signal.SIGALRM, timeout_handler)这一行中,我得到了ValueError: signal only works in main thread的错误。 - San4ez
1
正如楼主所述,信号只在主线程中起作用。楼主需要另一种解决方案。 - Martijn Pieters

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接