我编写了一个脚本来启动多个进程(简单的单元测试)以并行方式运行。它将使用num_workers
并行进程一次性完成N
项工作。
我的第一个实现是将进程分为num_workers
批,并且似乎运行良好(我在这里使用false
命令来测试其行为)。
import subprocess
errors = 0
num_workers = 10
N = 100
i = 0
while i < N:
processes = []
for j in range(i, min(i+num_workers, N)):
p = subprocess.Popen(['false'])
processes.append(p)
[p.wait() for p in processes]
exit_codes = [p.returncode for p in processes]
errors += sum(int(e != 0) for e in exit_codes)
i += num_workers
print(f"There were {errors}/{N} errors")
然而,这些测试所需的时间并不相等,有时我需要等待一个耗时较长的测试完成。因此,我对其进行了重写,以便在任务完成后立即分配新的任务。
import subprocess
import os
errors = 0
num_workers = 40
N = 100
assigned = 0
completed = 0
processes = set()
while completed < N:
if assigned < N:
p = subprocess.Popen(['false'])
processes.add((assigned, p))
assigned += 1
if len(processes) >= num_workers or assigned == N:
os.wait()
for i, p in frozenset(processes):
if p.poll() is not None:
completed += 1
processes.remove((i, p))
err = p.returncode
print(i, err)
if err != 0:
errors += 1
print(f"There were {errors}/{N} errors")
然而,这会导致最后几个进程的结果错误。例如,在上面的例子中,它产生了98/100的错误,而不是100。我检查过了,这与并发无关;由于某种原因,最近的两个作业返回了退出代码0。
为什么会发生这种情况?
errors
变量。 - BingsFos.wait()
会覆盖poll()
的返回码。 - Nathan Vērzemnieks