如何在Python中限制并发线程数?

6

我该如何在Python中限制并发线程数量?

比如,我有一个包含许多文件的目录,我想要处理所有这些文件,但是每次只允许4个文件同时并行处理。

下面是我目前的代码:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

如何修改代码以便一次只运行4个线程?

请注意,我希望等待所有文件被处理后再继续并处理已处理的文件。


2
你尝试过multiprocess池吗?在Python 3中,你也可以使用futures - javex
2
你也可以在Python 2中使用futures,只需要安装回溯版本即可。 - abarnert
concurrent.futures确实是实现它的最佳方式 - JBernardo
你可以使用multiprocessing.pool.ThreadPool来轻松限制线程数,就像这个答案中展示的那样。 - martineau
2个回答

9
例如,我有一个包含许多文件的目录,我想处理所有这些文件,但每次只能同时并行处理4个文件。
这正是线程池所做的:您创建作业,池会同时并行运行4个作业。通过使用执行器,您可以使事情变得更加简单,只需将函数(或其他可调用对象)交给执行器,它会返回结果的future。您可以自己构建所有这些内容,但不必这样做。
标准库的concurrent.futures模块是最简单的方法来实现这一点。(对于Python 3.1及以前版本,请参见backport。)事实上,其中一个主要示例非常接近您想要做的事情。但让我们将其调整为您的确切用例:
def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

如果你想让process_file返回一些内容,那几乎和之前一样简单:
def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

如果你想处理异常...那就看看这个例子吧;只需要在调用result()的周围加上try/except即可。


如果您想自己构建它们,那并不难。multiprocessing.pool的源代码编写和注释都很好,而且并不复杂,大部分困难的东西与线程无关;concurrent.futures的源代码甚至更简单。

0

我用过这种技术几次,但我认为它有点丑陋:

import threading

def process_something():
    something = list(get_something)

    def worker():
        while something:
            obj = something.pop()
            # do something with obj

   threads = [Thread(target=worker) for i in range(4)]
   [t.start() for t in threads]
   [t.join() for t in threads]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接