在concurrent.futures中检测失败的任务

14

我一直在使用concurrent.futures,因为它具有简单的接口并且可以让用户轻松控制线程/进程的最大数量。然而,似乎concurrent.futures隐藏了失败的任务,并在所有任务完成/失败后继续主线程。

import concurrent.futures

def f(i):
    return (i + 's')

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    fs = [executor.submit(f, i ) for i in range(10)]
    concurrent.futures.wait(fs)

调用函数f时,无论输入的整数是什么,都会导致TypeError。然而,整个脚本都能够正常运行并以代码0退出。是否有任何方法可以在任何线程失败时抛出异常/错误?

或者,有没有更好的方法来限制线程/进程的数量而不使用concurrent.futures?


相关链接:https://dev59.com/2lwX5IYBdhLWcg3wvRgf - Ciro Santilli OurBigBook.com
2个回答

16

concurrent.futures.wait()函数可以确保所有任务都已完成,但它不会检查成功(即返回)和失败(即在工作函数中引发并未被捕获的异常)之间的区别。要做到这一点,您需要对每个Future调用.result()方法(这将导致它重新引发任务中的异常或产生返回值)。还有其他方法可以在主线程中检查而不实际引发异常(例如.exception()),但.result()是最直接的方法。

如果您想使其重新引发异常,最简单的方法是将wait()调用替换为:

for fut in concurrent.futures.as_completed(fs):
    fut.result()

该方法将处理结果作为Future完成,并在发生异常时及时引发Exception。或者您可以继续使用wait,以便所有任务都完成后再检查其中任何一个的异常,然后直接迭代fs并对每个调用.result()


0

使用multiprocessing.Pool(用于进程)或multiprocessing.pool.ThreadPool(用于线程)也可以以同样的方式完成。据我所知,它会重新抛出任何捕获的异常。


这个问题涉及到 concurrent.futures。据我所知,这是 multiprocessing 模块的更现代实现。对我来说,建议使用旧库毫无意义。 - guettli
1
@guettli,这个旧库仍然被许多并行库(例如Joblib和Dask)积极使用。而且,未来对象在功能上与具有其“map”函数的Pool不等效-后者可能比等待多个未来对象更有效率。 - Anton
是的,许多库都积极使用多进程。但是这个问题的标题是:“在concurrent.futures中检测失败的任务”。 - guettli
1
是的。但这是否意味着我们不能向 OP 的想法提出任何替代方案?有时回答问题没有其他途径,只能重定向到其他事物。 - Anton
据我所知,concurrent.futures更为现代化。我知道multiprocessing并未被弃用,但我不建议回到过去。当然,有时候重定向到其他内容非常有帮助。 - guettli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接