Python中线程池中每个线程的超时时间

6

我正在使用Python 2.7。

目前我是这样使用ThreadPoolExecuter的:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

问题在于f有时运行时间太长。每当运行f时,我希望将其运行限制为100秒,然后终止它。
最终,对于param中的每个元素x,我想知道是否必须终止f,以及如果没有终止-返回值是什么。 即使f对于一个参数超时,我仍然希望用下一个参数运行它。 executer.map方法确实有一个timeout参数,但它为整个运行设置了一个超时时间,从调用executer.map的时间开始,而不是分别针对每个线程。
获取所需行为的最简单方法是什么?

1
在Python中没有直接终止线程的方法。如果您传递给maptimeout到期,它实际上不会终止执行器线程,它只会使内部进行的future.result(timeout)调用引发TimeoutError异常。然而,工作线程将继续在后台运行。如果您需要实际终止线程,则需要让您的工作函数检查父进程可以在超时后设置的某种标志。但是,这可能不容易实现,这取决于工作函数正在执行什么操作。 - dano
@dano:我明白了。进程仍在后台运行可能是我可以接受的。但是,假设处理params[4]的线程被卡住了,我是否仍然可以获取处理params[5]到params[9]的进程结果? - user302099
@user302099:如果您想在params[4]之前获取params[5]的结果,可以使用as_completed()而不是map()。如果您使用线程,则函数应该合作(遵守退出条件)。如果您不能依赖函数的行为,则请使用进程。 - jfs
1个回答

9
这个答案涉及到 Python 的 multiprocessing 库,通常比 threading 库更可取,除非你的函数只是在等待网络调用。请注意,multiprocessing 和 threading 库具有相同的接口。
考虑到每个进程可能运行100秒,与创建每个进程的开销相比,这个开销非常小。您可能需要创建自己的进程以获得必要的控制。
一种选项是将 f 包装在另一个函数中,该函数最多执行 100 秒:
from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

那么你的代码就变成了:

    result = list(executor.map(timeout_f, params))

或者,您可以编写自己的线程/进程控制:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

你可以通过像 Can I get a return value from multiprocessing.Process? 这样的方法获取返回值。 如果进程完成,则进程的退出码为0,否则为非零数。 参考自:Join a group of python processes with a timeout, How do you split a list into evenly sized chunks?
另外一种选择是,在multiprocessing.Pool上尝试使用apply_async。
from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

请注意,以上每个进程可能需要等待超过100秒的时间,例如如果第一个进程需要50秒才能完成,则第二个进程将在其运行时间中多出50秒。需要更复杂的逻辑(例如前面的示例)来强制执行更严格的超时限制。

第一种解决方案会强制让你等待100秒,即使所有进程在5秒内都完成了。你可能想要一个循环,每隔几秒钟休眠一次,然后检查是否仍有任何进程在运行,如果有,则继续休眠。 - dano
@dano 是的,我正在快速回答。已更新为使用更好的逻辑。 - Zags
看起来你在编辑时出现了某种复制/粘贴错误。缩进有误,而且你两次调用了 jointerminate - dano
为了使用第一种解决方案,您可以在包含第二个池的函数上调用映射,但是该池需要是非守护进程:https://dev59.com/EGw05IYBdhLWcg3w3lol - Chris Lucian
@Zags 选项1完美地运作着。我有一个疑问,如果pool.apply_async(f, [arg]).get(timeout=100)按预期运行以在超时后终止进程,那么我们为什么需要使用ThreadPoolExecutor来创建线程。相反,可以仅使用多进程池。 - Darknight

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接