如何在Python中并行运行os.walk?

26

我用Java编写了一个简单的应用程序,它接收一个路径列表,并生成一个包含该原始列表下所有文件路径的文件。

如果我有一个名为 paths.txt 的文件,其中包含:

c:\folder1\
c:\folder2\
...
...
c:\folder1000\

我的应用程序在每个路径上都运行递归函数,并返回包含这些文件夹下所有文件路径的文件。

现在我想使用Python编写此应用程序。

我编写了一个简单的应用程序,使用os.walk()遍历给定的文件夹并将文件路径打印到输出中。

现在我想以并行方式运行它,并且我知道Python有一些模块可以实现这一点:多线程和多进程。

什么是最好的方法?在这种方式内,如何执行?


2
你期望使用多线程遍历能够带来什么好处呢?由于IO限制,单线程遍历的速度与让两个线程分担IO负载时的速度相同……那么你希望通过多线程遍历赢得什么呢? - user2665694
1
在Java中运行相同的程序时,多线程运行速度更快。其中一些路径是网络路径,因此多线程运行可能会优化网络流量。 - user1251654
在这里使用多进程模块比线程模块更合适。 - user2665694
@AndreasJung:我同意,但你会怎么做呢? - static_rtti
Python的os.walk存在一个问题,如果你有许多文件的目录,它会一次迭代一个目录中的信息,这将读取很多信息到内存中。我发现改用递归的os.listdir生成器比使用os.walk更好。 - woot
3个回答

27

这里是一个多进程处理的解决方案:

from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os

def explore_path(path):
    directories = []
    nondirectories = []
    for filename in os.listdir(path):
        fullname = os.path.join(path, filename)
        if os.path.isdir(fullname):
            directories.append(fullname)
        else:
            nondirectories.append(filename)
    outputfile = path.replace(os.sep, '_') + '.txt'
    with open(outputfile, 'w') as f:
        for filename in nondirectories:
            print >> f, filename
    return directories

def parallel_worker():
    while True:
        path = unsearched.get()
        dirs = explore_path(path)
        for newdir in dirs:
            unsearched.put(newdir)
        unsearched.task_done()

# acquire the list of paths
with open('paths.txt') as f:
    paths = f.read().split()

unsearched = Queue()
for path in paths:
    unsearched.put(path)

with Pool(5) as pool:
    for i in range(5):
        pool.apply_async(parallel_worker)

unsearched.join()
print('Done')

1
这将使你所做的工作并行化,但不会使行走本身并行化。 - static_rtti
3
为什么那很重要呢?并行化的目的是加速整个任务。细粒度并行化通常比粗粒度的更昂贵。此外,这需要您打破*os.walk()*的抽象,并重新编写其中的一些代码(放弃首次使用该库的好处)。另外,并行化遍历本身并没有太多好处,因为这是一个I/O限制操作。 - Raymond Hettinger
2
@static_rtti 另一个想法。OP在评论中提到他正在使用多个可以并行运行的网络驱动器。因此,通过同时运行多个遍历任务来获得速度优势。如果尝试并行化遍历本身,则没有好处,因为单个驱动器上的读/写臂一次只能执行一项任务。 - Raymond Hettinger
1
@static_rtti,虽然您说的没错,具体情况是否有利于并行化步行本身取决于情况,但是根据经验,在不同驱动器上通过网络进行多个步行任务的粗粒度并行化几乎肯定会提供加速。您是否曾在多个网络驱动器上执行过“du -s *”或“find ...”?这些命令永远无法饱和您的网络接口,并且它们通过不同的系统(例如NFS连接)完全独立运行。 - Dr. Jan-Philip Gehrcke
2
os.walk()在递归之前会产生一次yield。break确保它永远不会到达递归阶段。代码是正确的。如果让你烦恼,那么可以使用os.path.isdir()手动将os.listdir()分成目录和文件。这就是os.walk()在内部所做的。OP的问题已经得到了公正的回答。现在轮到你履行职责了。 - Raymond Hettinger
显示剩余5条评论

6

这是一个在Python中用到过的线程模式,与CPython线程的工作方式相关可能不会增加性能。

import threading
import Queue
import os

class PathThread (threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print path + "/" + f

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()

# threadsafe queue
pathqueue = Queue.Queue()
paths = ["foo", "bar", "baz"]

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()

5
采用线程化方法进行并行处理可能会适得其反。Python的全局解释器锁会阻止任何两个线程同时运行。 - Raymond Hettinger
9
不适用于IO操作,不过这并不是 @RaymondHettinger 的意思。 - Veedrac
4
抱歉,在这种情况下,您的说法是明显错误的。您正在重复GIL的大谎言。阻塞操作和I/O并不受GIL限制,有许多情况下处理发生在GIL之外。别误会,GIL很糟糕,但它并不意味着Python中的多线程毫无价值。 - woot

1
即使是线程也可以对目录遍历非常有帮助。我使用以下代码来遍历SharePoint树,使用大约50个线程可以获得显着的加速。 这个特定的程序返回一个目录结构中所有xml文件的(路径、数据)对,并且可以很容易地扩展为您的用途。 (这是从我的程序中复制和粘贴的;需要进行一些额外的编辑。)
#unique string for error passing error messages
ERROR = '\xffERROR\xff'

class ScanWorker(threading.Thread):
    """Worker class for scanning directory structures.
    pathQueue: queue for pathnames of directories
    resultQueue: results of processFile, pairs of (path, data) to be updated
    """
    lock = threading.Lock()
    dirCount = 0
    def __init__(self, pathQueue, resultQueue):
        self.pathQueue = pathQueue
        self.resultQueue = resultQueue
        super().__init__()

    def run(self):
        """Worker thread.
        Get a directory, process it, and put new directories on the
        queue."""
        try:
            while True:
                self.processDir(self.pathQueue.get())
                self.pathQueue.task_done()
        except Exception as e:
            #pass on exception to main thread
            description = traceback.format_exception(*sys.exc_info())
            description.insert(0,
                "Error in thread {}:\n".format(
                    threading.current_thread().name))
            self.resultQueue.put((ERROR, description))
            self.pathQueue.task_done()

    def processDir(self, top):
        """Visit a directory
        Call self.processFile on every file, and queue the directories.
        """
        #Wait and retry a few times in case of network errors.
        #SharePoint is not reliable, gives errors for no reason
        for retryCount in range(30):
            try:
                names = listdir(top)
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="L", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("List: too many retries")
            raise lastError
        #it is not important to worry about race conditions here
        self.__class__.dirCount += 1
        #process contents
        for name in names:
            if isdir(join(top, name)): self.pathQueue.put(join(top, name))
            else: self.processFile(join(top, name))

    def processFile(self, path):
        """Get XML file.
        """
        #only xml files
        if not path.lower().endswith('.xml'): return
        filemtime = datetime.fromtimestamp(getmtime(path))
        #SharePoint is not reliable, gives errors for no reason; just retry
        for retryCount in range(30):
            try:
                data = open(path,'rb').read()
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="R", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("Read: too many retries")
            raise lastError
        self.resultQueue.put((path, data))

class Scanner:
    """Interface to the ScanWorkers
    Sharepoint is pretty fast compared to its delay and handles 50 workers well
    Make sure you only create one instance of Scanner!
    """
    def __init__(self, workers):
        #don't restrict the path queue length; this causes deadlock
        #we use a LIFO queue to get more depth-first like search
        #reducing average queue length and hopefully improving server caching
        self.pathQueue = queue.LifoQueue()
        #this is the output queue to the main thread
        self.resultQueue = queue.Queue(5)
        self.workers = workers
        #start workers
        for i in range(workers):
            t = ScanWorker(self.pathQueue, self.resultQueue)
            t.setDaemon(True)
            t.start()

    def startWorkers(self, path):
        #add counter
        self.added = 0
        #and go
        self.pathQueue.put(path)

    def processResult(self, wait=True):
        """Get an element from the result queue, and add to the zip file."""
        path, data = self.resultQueue.get(block=wait)
        if path==ERROR:
            #process gave alarm; stop scanning
            #pass on description
            raise ScanError(data)
        <do whatever you want to do with the file>
        self.resultQueue.task_done()
        self.added += 1

#main
try:
    #set up
    scanner = Scanner(threads)
    scanner.startWorkers(rootpath)
    pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue
    #scanner is rolling; wait for it to finish
    with pathQueue.all_tasks_done:
        while pathQueue.unfinished_tasks:
            #tasks are still running
            #process results
            while True:
                try: scanner.processResult(wait=False)
                except queue.Empty: break
            #no new files found; check if scanner is ready
            done = pathQueue.all_tasks_done.wait(timeout=1)
            if not done:
                #Not yet; print something while we wait
                print(
                    "\rProcessed {} files from {} directories [{} {}]  "
                    .format(
                        scanner.added,
                        ScanWorker.dirCount,
                        pathQueue.unfinished_tasks,
                        resultQueue.unfinished_tasks,
                    ), end='\r')
    #just to make sure everybody is ready: join the path queue
    pathQueue.join()
    #process remaining of result queue
    while resultQueue.unfinished_tasks: scanner.processResult(wait=True)
    #go to new line to prevent overwriting progress messages
    print()
except ScanError as e:
    print()
    print(*e.args[0], end='')
    print("Process interrupted.")
except KeyboardInterrupt:
    print("\nProcess interrupted.")
print()

你导入了哪些模块? - Ari

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接