使用Python控制调用外部命令的子进程数量

17

我了解使用subprocess是调用外部命令的首选方式。

但是如果我想要并行运行多个命令,但又限制生成的进程数量怎么办?让我困扰的是无法阻止子进程。例如,如果我调用:

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

然后进程将继续运行,而不会等待cmd完成。因此,我不能将其封装在multiprocessing库的工作程序中。

比如,如果我这样做:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

然后每个工作进程都会在生成子进程后完成并返回。因此,我无法通过使用Pool来限制 subprocess 生成的进程数。

有什么正确的方法可以限制子进程的数量吗?

3个回答

19

您不需要多个Python进程或者线程来限制最大并行子进程的数量:

from itertools import izip_longest
from subprocess import Popen, STDOUT

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
          for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
    for p in filter(None, processes):
        p.wait()

查看 如何在Python中按n个元素的大小迭代迭代器?

如果您想限制并发子进程的最大和最小数量,可以使用线程池:

from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call

def run(cmd):
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
    if rc != 0:
        print('{cmd} failed with exit status: {rc}'.format(**vars()))

只要 limit 中的任何一个子进程结束,就会启动一个新的子进程以始终保持 limit 个子进程。

或者使用 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call

with ThreadPoolExecutor(max_workers=limit) as executor:
    for cmd in commands:
        executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)

这是一个简单的线程池实现:

import subprocess
from threading import Thread

try: from queue import Queue
except ImportError:
    from Queue import Queue # Python 2.x


def worker(queue):
    for cmd in iter(queue.get, None):
        subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)

q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
    t.daemon = True
    t.start()

for cmd in commands:  # feed commands to threads
    q.put_nowait(cmd)

for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join()    # wait for completion

为避免过早退出,请添加异常处理。

如果你想把子进程的输出保存在字符串中,请参见Python:并行执行cat子进程


8
你可以使用subprocess.call,如果你想等待命令完成。有关更多信息,请参见pydoc subprocess
你还可以在工作进程中调用Popen.wait方法。
def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
    p.wait()

因为这个答案似乎有些混淆,所以这里提供一个完整的示例:

import concurrent.futures
import multiprocessing
import random
import subprocess


def worker(workerid):
    print(f"start {workerid}")
    p = subprocess.Popen(["sleep", f"{random.randint(1,30)}"])
    p.wait()
    print(f"stop {workerid}")
    return workerid


def main():
    tasks = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
        for i in range(20):
            tasks.append(pool.submit(worker, i))

        print("waiting for tasks...", flush=True)
        for task in concurrent.futures.as_completed(tasks):
            print(f"completed {task.result()}", flush=True)
        print("done.")


if __name__ == "__main__":
    main()

如果你运行上面的代码,你会发现所有worker进程开始以并行方式运行,并且我们能够在它们完成时收集值。

2
这将完全禁用并行处理。 - qed
2
不应该。问题是使用multiprocessing模块,每个工作进程都在单独的进程中生成,因此在一个工作进程中进行wait()操作不会阻止其他工作进程运行。话虽如此,这本身并不正确--这个例子没有从工作进程中return任何内容,因此调用结果上的.get()不会返回任何内容。 - larsks
不,等待阻塞和其他工作线程是在函数返回后才生成的。[这个例子演示了即使第一个命令非常缓慢,第二个命令也会在第一个命令之后运行。] (https://paste.ee/p/pwaXV) - asynts
1
有没有人读注释?它不会阻塞其他工作进程,wait()在一个工作子进程中运行。@asynts 我为你更新答案,并提供完整的示例。 - larsks
1
我明白你的意思了,已经取消了踩的评价。 - asynts

0
我的解决方案是创建一个进程列表,并为其设置最大可用长度。
每次在列表中添加新进程之前,我都会检查列表。
代码:
import subprocess
import time

max_num_process = 16
check_gap = .1 # sec

def check_processes(processes):
    # check & wait until available
    while True:
        if len(processes) < max_num_process:
                break

        for process in processes:
            if process.poll() is not None:  # not running
                processes.remove(process)
        time.sleep(check_gap)
    return processes

processes = []
for command in commands:
    processes = check_processes(processes)
    processes.append(subprocess.Popen(command, shell=True))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接