如何使用ThreadPoolExecutor进行递归遍历目录?

6

我的真正任务是使用paramiko和多线程递归遍历远程目录。为了简单起见,我只是用本地文件系统来演示它:

from pathlib import Path
from typing import List
from concurrent.futures import ThreadPoolExecutor, Executor

def listdir(root: Path, executor: Executor) -> List[Path]:
    if root.is_dir():
        xss = executor.map(lambda d: listdir(d, executor), root.glob('*'))
        return sum(xss, [])
    return [root]

with ThreadPoolExecutor(4) as e:
    listdir(Path('.'), e)

然而,以上代码一直在运行。

我的代码有什么问题?如何修复它(最好使用Executor而不是原始的Thread)?

编辑:我通过以下代码确认了@Sraw的答案:

In [4]: def listdir(root: Path, executor: Executor) -> List[Path]:
   ...:     print(f'Enter {root}', flush=True)
   ...:     if root.is_dir():
   ...:         xss = executor.map(lambda d: listdir(d, executor), root.glob('*'))
   ...:         return sum(xss, [])
   ...:     return [root]
   ...:

In [5]: with ThreadPoolExecutor(4) as e:
   ...:     listdir(Path('.'), e)
   ...:
Enter .
Enter NonRestrictedShares
Enter corporateActionData
Enter RiskModelAnnualEPS
Enter juyuan
1个回答

4

你的代码中存在死锁。

由于你正在使用 ThreadPoolExecutor(4),这个执行器中只有四个工作线程,因此你不能同时运行超过四个任务。

想象一下以下最简单的结构:

test
----script.py
----test1
--------test2
------------test3
----------------test4
--------------------test5

如果运行python script.py,第一个工作线程处理test1,第二个线程处理test1/test2,第三个线程处理test1/test2/test3,第四个线程处理test1/test2/test3/test4。现在工作线程已经用完了。但是又有另一个任务test1/test2/test3/test4/test5插入到工作队列中。

因此它将永远挂起。


谢谢,有没有简单的方法可以修复我的代码?我在想是否可以使用 Executor.submit 而不是 Executor.map 来避免死锁。 - Eastsun
2
不,submit并不能解决这个问题。但我认为你只能在顶层目录使用executor,以便所有任务都是独立的。这不是一个完美的解决方案,但它是最简单的一个,并且我认为对于大多数情况来说也足够好了。 - Sraw

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接