这个答案涉及到 Python 的 multiprocessing 库,通常比 threading 库更可取,除非你的函数只是在等待网络调用。请注意,multiprocessing 和 threading 库具有相同的接口。
考虑到每个进程可能运行100秒,与创建每个进程的开销相比,这个开销非常小。您可能需要创建自己的进程以获得必要的控制。
一种选项是将 f 包装在另一个函数中,该函数最多执行 100 秒:
from multiprocessing import Pool
def timeout_f(arg):
pool = Pool(processes=1)
return pool.apply_async(f, [arg]).get(timeout=100)
那么你的代码就变成了:
result = list(executor.map(timeout_f, params))
或者,您可以编写自己的线程/进程控制:
from multiprocessing import Process
from time import time
def chunks(l, n):
""" Yield successive n-sized chunks from l. """
for i in xrange(0, len(l), n):
yield l[i:i+n]
processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
for p in five_processes:
p.start()
time_waited = 0
start = time()
for p in five_processes:
if time_waited >= 100:
p.join(0)
p.terminate()
p.join(100 - time_waited)
p.terminate()
time_waited = time() - start
for p in five_processes:
exit_codes.append(p.exit_code)
你可以通过像
Can I get a return value from multiprocessing.Process? 这样的方法获取返回值。
如果进程完成,则进程的退出码为0,否则为非零数。
参考自:
Join a group of python processes with a timeout,
How do you split a list into evenly sized chunks?
另外一种选择是,在
multiprocessing.Pool上尝试使用apply_async。
from multiprocessing import Pool, TimeoutError
from time import sleep
if __name__ == "__main__":
pool = Pool(processes=5)
processes = [pool.apply_async(f, [i]) for i in params]
results = []
for process in processes:
try:
result.append(process.get(timeout=100))
except TimeoutError as e:
results.append(e)
请注意,以上每个进程可能需要等待超过100秒的时间,例如如果第一个进程需要50秒才能完成,则第二个进程将在其运行时间中多出50秒。需要更复杂的逻辑(例如前面的示例)来强制执行更严格的超时限制。
map
的timeout
到期,它实际上不会终止执行器线程,它只会使内部进行的future.result(timeout)
调用引发TimeoutError
异常。然而,工作线程将继续在后台运行。如果您需要实际终止线程,则需要让您的工作函数检查父进程可以在超时后设置的某种标志。但是,这可能不容易实现,这取决于工作函数正在执行什么操作。 - danoparams[4]
之前获取params[5]
的结果,可以使用as_completed()
而不是map()
。如果您使用线程,则函数应该合作(遵守退出条件)。如果您不能依赖函数的行为,则请使用进程。 - jfs