通过fork()运行多个子进程的最佳方法是什么?

10

一个Python脚本需要通过fork()生成多个子进程。所有这些子进程都应该同时运行,父进程应该等待它们全部完成。如果能够设置一些超时时间来控制“慢”子进程的执行,那就更好了。 当所有子进程被收集后,父进程将继续处理脚本的其余部分。

有什么最好的解决办法吗?谢谢。

5个回答

12
简单例子:
import os
children = []
for job in jobs:
    child = os.fork()
    if child:
        children.append(child)
    else:
        pass  # really should exec the job
for child in children:
    os.waitpid(child, 0)

超时处理一个慢速子进程需要更多的工作;你可以使用wait而不是waitpid,并从子进程列表中剔除返回的值,而不是依次等待每个子进程(如此处)。如果你设置了一个带有SIGALRM处理程序的alarm,你可以在指定的延迟后终止等待。这都是标准的UNIX内容,与Python无关...

子进程执行完毕后应该退出吗? - zuo
@zuo 子进程应该使用 exec(将其替换为要运行的子进程)或 _exit - ephemient

7

Ephemient: 在您的代码中,每个子进程在完成任务后都会留在for循环中。他们将一遍又一遍地进行分叉。此外,当children []不为空时启动的子进程将尝试在循环结束时等待一些兄弟姐妹。最终会有人崩溃。这是一个解决方法:

import os, time

def doTheJob(job):
    for i in xrange(10):
        print job, i
        time.sleep(0.01*ord(os.urandom(1)))
        # random.random() would be the same for each process

jobs = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"]
imTheFather = True
children = []

for job in jobs:
    child = os.fork()
    if child:
        children.append(child)
    else:
        imTheFather = False
        doTheJob(job)
        break

# in the meanwhile 
# ps aux|grep python|grep -v grep|wc -l == 11 == 10 children + the father

if imTheFather:
    for child in children:
        os.waitpid(child, 0)

1
根据我源代码中的注释,需要将“pass”替换为“exec(job)或类似内容”。如果这样做,一切都会按预期工作。另外,最好使用os.exit()而不是设置imTheFather。 - ephemient
@ephemient 但是为什么os._exit()更好呢? - Alois Mahdal
@AloisMahdal 使用os._exit来退出分叉的子进程 http://docs.python.org/2/library/os.html#os._exit - Marconi

2

1

与子进程通信的传统UNIX方式是打开管道到它们的标准输入/输出,并使用select()系统调用在父进程中复用通信(通过Python中的select模块可用)。

如果需要终止运行缓慢的子进程,只需保存其进程ID(由os.fork()调用返回),然后在不再需要时使用os.kill()将其杀死。当然,更好的方法可能是能够明确地与子进程通信并告诉它关闭自己。


0

我以前用过perl做过这个。现在正在学习python,想要复制这个功能。一个未知数量的分叉任务调度程序必须跟踪运行中的任务、已结束的任务和返回代码。这段代码包括SIGCHLD处理程序的代码、父任务和一个简单的子任务。

#!/usr/bin/env python3
import signal, traceback
import os, subprocess
import time
#
#   sigchild handler for reaping dead children
#
def handler(signum, frame):
#
#   report stat of child tasks  
    print(children)
#
#   use waitpid to collect the dead task pid and status
    pid, stat = os.waitpid(-1, 0)
    term=(pid,stat)
    print('Reaped: pid=%d stat=%d\n' % term)
#
#   add pid and return code to dead kids list for post processing
    ripkids.append(term)
    print(ripkids)
    print('\n')
#
#   update children to remove pid just reaped
    index = children.index(pid)
    children.pop(index)
    print(children)   
    print('\n')

# Set the signal handler 
signal.signal(signal.SIGCHLD, handler)

def child():
   print('\nA new child ',  os.getpid())
   print('\n')
   time.sleep(15)
   os._exit(0)  

def parent():
#
# lists for started and dead children
   global children
   children = []
   global ripkids
   ripkids = []

   while True:
      newpid = os.fork()
      if newpid == 0:
         child()
      else:
         pidx = (os.getpid(), newpid)
         children = children+[newpid]
         print("parent: %d, child: %d\n" % pidx)
         print(children)
         print('\n')
      reply = input("q for quit / c for new fork")
      if reply == 'c': 
          continue
      else:
          break

parent()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接