Python:并行运行子进程

35

我有以下代码,它将md5sums写入日志文件

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
  1. 这些会同时进行吗?例如,如果对于其中一个文件,md5sum需要很长时间,那么在等待上一个文件完成之前是否会启动另一个文件的处理?

  2. 如果上述答案是肯定的,我可以假设写入日志文件的md5sum顺序可能会因为每个文件的md5sum所需时间不同而有所不同吗?(一些文件可能非常大,一些可能很小)

3个回答

31
  1. 是的,这些md5sum进程将会并行启动。
  2. 是的,md5sum写入的顺序将是不可预测的。一般来说,从许多进程共享单个资源(如文件)被认为是一种不良做法。

此外,您在for循环后使用p.wait()将只等待最后一个md5sum进程完成,而其余进程可能仍在运行。

但是,您可以稍微修改此代码,以便即使将md5sum输出收集到临时文件中,也可以获得并行处理和同步输出可预测性的好处,并在所有进程完成后将其收集回一个文件中。

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()

所以我猜这里的顺序是被保留的,因为processes[]跟踪它?也就是说,process.append((p,f))在md5sum完成之前按照files_output的顺序执行。 - imagineerThat
2
是的,processes[] 将保留 files_output[] 的原始顺序,并确保每个 md5sum 进程都已完成。但如果您担心操作系统的资源,您应该考虑使用任务队列和线程池,并在每个线程中同步运行 md5sum,如 @Alfe 建议的那样,使用 subprocess.check_output() - dkz

25

所有子进程并行运行。(为避免这种情况,必须显式等待它们完成。)它们甚至可以同时写入日志文件,导致输出混乱。为了避免这种情况,您应该让每个进程写入不同的日志文件,并在所有进程完成时收集所有输出。

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # Sentinel?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one Sentinel marker for each thread

之后应该将结果存储在result中。


谢谢。然而,我有成千上万个md5sums。我宁愿不为每个单独打开一个文件。 - imagineerThat
4
不,你不应该这样做。创建一个Queue.Queue和一些十几个线程的线程池,让每个线程从队列中读取一个元素并为此元素启动一个子进程,等待该子进程完成,获取结果(md5校验和),将结果存储在映射中。如果队列为空,则线程应终止。 - Alfe
Python字典是线程安全的(实际上所有的Python数据结构都是线程安全的,这反过来也是Python有时会遇到的性能问题,详见全局解释器锁)。因此,在向其中写入内容时不需要使用Queue.Queue。 - Alfe
2
我不知道你的代码,必须直接查看才能确定。目前的代码是这样的:将所有任务(按原始顺序)放入队列中,并告诉20个工作人员每个人都要执行以下操作:从队列中取出一个任务并处理,直到从队列中获得EOF(无)。因为工作人员是并行工作的,所以最后一个获得第一个任务的工作人员(第20个任务)可能会是第一个完成任务的工作人员。这将改变结果到达的顺序。但这取决于任务需要的时间。 - Alfe
1
此代码中的结果存储在结果映射中,因此在所有结果都被收集后,您可以遍历原始列表(其顺序是您想要的)并从结果字典中获取匹配结果。 - Alfe
显示剩余4条评论

13

使用线程池并从主进程向文件写入是收集并行 md5sum 子进程输出的简单方法:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
  • md5sum的输出很小,因此可以将其存储在内存中
  • imap保留顺序
  • number_of_processes可能与文件数或CPU核心数不同(更大的值并不意味着更快:它取决于IO(磁盘)和CPU相对性能的关系)

您可以尝试一次性传递多个文件给md5sum子进程。

在这种情况下,您不需要外部的子进程;您可以在Python中计算md5

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()
为了使用多个进程而不是线程(以允许纯Python md5sum() 在利用多个CPU并行运行),只需从上面代码中的导入中删除.dummy即可。

要使用多个进程而非线程(以便利用多个CPU并行运行纯Python md5sum()),只需从上述代码中的导入中删除.dummy即可。


抱歉,我还在学习中。我不明白为什么这里没有使用队列。如果多个进程都在写入日志文件,难道不会出现问题吗?如果我错了,那么如何进行同步? - imagineerThat
看起来Pool支持异步调用。这是否意味着它按照(filenames的)顺序维护md5的顺序?而不是简单地启动x个线程? - imagineerThat
1
“Pool” 提供更高级别的接口。它在内部使用 “Queue”。 “logfile” 文件仅从主线程访问(仅在子线程中执行 “md5sum()” 函数)。 “imap()” 按顺序返回结果(正如我已经明确提到的那样)。 - jfs
你能指点我一些可以帮助我学习这些主题的资源吗?我已经尝试过谷歌搜索,但还没有找到关于多进程方面足够综合和简单介绍的资料。 - imagineerThat
这是一个广泛的话题(尝试查找并发/并行/分布式编程/计算)。你对哪些特定方面感兴趣? - jfs
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接