嵌套使用concurrent.futures.ThreadPoolExecutor

27

我有一个程序,目前正在使用concurrent.futures.ThreadPoolExecutor并发地运行多个任务。这些任务通常是I/O绑定的,涉及对本地数据库和远程REST API的访问。但是,这些任务本身也可以分成子任务,这些子任务也会受益于并发。

我希望使用concurrent.futures.ThreadPoolExecutor在任务内部是安全的。我编写了一个玩具示例,似乎可以正常工作:

import concurrent.futures


def inner(i, j):
    return i, j, i**j


def outer(i):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(inner, i, j): j for j in range(5)}
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results


def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(outer, i): i for i in range(10)}
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.extend(future.result())
    print(results)


if __name__ == "__main__":
    main()
尽管这个玩具示例似乎可以工作,但我希望有些信心证明这是有意的。我希望它是有意的,因为否则在使用执行器执行任意代码时将不安全,以防它还使用concurrent.futures来利用并发性。

嗯,我认为你应该避免fork-bomb。在子线程之前和之后,你有没有考虑过花费的时间? - cgte
这个答案对我也很有帮助 https://dev59.com/YMLra4cB1Zd3GeqPMI49 - rtviii
1个回答

8

从其他线程中生成线程没有任何问题。你的情况也不例外。

不过,随着时间的推移,生成线程的开销会变得相当高,生成更多的线程实际上会导致软件变慢。

我强烈建议使用像 asyncio 这样美妙地处理任务异步的库。它通过使用一个具有非阻塞 io 的线程来实现。结果可能会比正常线程更快,因为开销要小得多。

如果你不想使用 asyncio,为什么不在主函数中创建另一个池执行器,并将其传递给 outer() 函数呢?这样,你将最多只有 10 个(2x5)线程,而不是 25 个(5x5),这样更加合理。

你不能将调用 outer() 的相同 main() 执行器传递给 outer(),因为这可能会导致死锁(每个 outer() 在调度 inner() 前都等待另一个 outer() 完成)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接