Python多进程 - AssertionError:只能加入子进程

15

我第一次尝试使用Python的multiprocessing模块,但是遇到了一些问题。虽然我非常熟悉threading模块,但我需要确保我执行的进程是并行运行的。

这是我尝试做的事情的大纲,请忽略未声明的变量/函数之类的东西,因为我无法完整粘贴我的代码。

import multiprocessing
import time

def wrap_func_to_run(host, args, output):
    output.append(do_something(host, args))
    return

def func_to_run(host, args):
    return do_something(host, args)

def do_work(server, client, server_args, client_args):
    server_output = func_to_run(server, server_args)
    client_output = func_to_run(client, client_args)
    #handle this output and return a result
    return result

def run_server_client(server, client, server_args, client_args, server_output, client_output):
    server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
    server_process.start()  
    client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
    client_process.start()
    server_process.join()
    client_process.join()
    #handle the output and return some result    

def run_in_parallel(server, client):
    #set up commands for first process
    server_output = client_output = []
    server_cmd = "cmd"
    client_cmd = "cmd"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
    process_one.start()
    #set up second process to run - but this one can run here
    result = do_work(server, client, "some server args", "some client args")
    process_one.join()
    #use outputs above and the result to determine result
    return final_result

def main():
    #grab client
    client = client()
    #grab server
    server = server()
    return run_in_parallel(server, client)

if __name__ == "__main__":
    main()

这是我收到的错误信息:

Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
    p.join()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
    assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process

我尝试了许多不同的方法来修复这个问题,但我的感觉是我使用这个模块的方式出现了问题。

编辑:

因此,我创建了一个文件,通过模拟客户端/服务器和它们所做的工作来重现这个问题 - 另外我错过了一个重要的点,那就是我在Unix上运行这个文件。另一个重要的信息是,在我的实际情况下,do_work 涉及使用 os.fork()。如果没有使用os.fork(),我无法重现错误,所以我认为问题出现在那里。在我的真实世界的案例中,那部分代码不是我写的,所以我像黑盒子一样处理它(可能是我的错误)。无论如何,这里是重现该问题的代码:

#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys

class Host():
    def __init__(self):
        self.name = "host"

    def work(self):
        #override - use to simulate work
        pass

class Server(Host):
    def __init__(self):
        self.name = "server"

    def work(self):
        x = 0
        for i in range(10000):
            x+=1
        print x
        time.sleep(1)

class Client(Host):
    def __init__(self):
        self.name = "client"

    def work(self):
        x = 0
        for i in range(5000):
            x+=1
        print x
        time.sleep(1)

def func_to_run(host, args):
    print host.name + " is working"
    host.work()
    print host.name + ": " + args
    return "done"

def do_work(server, client, server_args, client_args):
    print "in do_work"
    server_output = client_output = ""
    child_pid = os.fork()
    if child_pid == 0:
        server_output = func_to_run(server, server_args)
        sys.exit(server_output)
    time.sleep(1)

    client_output = func_to_run(client, client_args)
    # kill and wait for server to finish
    os.kill(child_pid, signal.SIGTERM)
    (pid, status) = os.waitpid(child_pid, 0)

    return (server_output == "done" and client_output =="done")

def run_server_client(server, client, server_args, client_args):
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
    print "Starting server process"
    server_process.start()
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
    print "Starting client process"
    client_process.start()
    print "joining processes"
    server_process.join()
    client_process.join()
    print "processes joined and done"

def run_in_parallel(server, client):
    #set up commands for first process
    server_cmd = "server command for run_server_client"
    client_cmd = "client command for run_server_client"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
    print "Starting process one"
    process_one.start()
    #set up second process to run - but this one can run here
    print "About to do work"
    result = do_work(server, client, "server args from do work", "client args from do work")
    print "Joining process one"
    process_one.join()
    #use outputs above and the result to determine result
    print "Process one has joined"
    return result

def main():
    #grab client
    client = Client()
    #grab server
    server = Server()
    return run_in_parallel(server, client)

if __name__ == "__main__":
    main()
如果我在 do_work 中删除 os.fork() 的使用,就不会出现错误,并且代码的行为和之前预期的一样(除了输出传递,我已经接受了这是我的错误/误解)。我可以更改旧代码以不使用 os.fork(),但我也想知道为什么会导致此问题,是否有可行的解决方案。
编辑2:
在接受答案之前,我开始研究一个省略 os.fork() 的解决方案。这是我所拥有的,稍微调整了可以完成的模拟工作量-
#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty

class Host():
    def __init__(self):
        self.name = "host"

    def work(self, w):
        #override - use to simulate work
        pass

class Server(Host):
    def __init__(self):
        self.name = "server"

    def work(self, w):
        x = 0
        for i in range(w):
            x+=1
        print x
        time.sleep(1)

class Client(Host):
    def __init__(self):
        self.name = "client"

    def work(self, w):
        x = 0
        for i in range(w):
            x+=1
        print x
        time.sleep(1)

def func_to_run(host, args, w, q):
    print host.name + " is working"
    host.work(w)
    print host.name + ": " + args
    q.put("ZERO")
    return "done"

def handle_queue(queue):
    done = False
    results = []
    return_val = 0
    while not done:
        #try to grab item from Queue
        tr = None
        try:
            tr = queue.get_nowait()
            print "found element in queue"
            print tr
        except Empty:
            done = True
        if tr is not None:
            results.append(tr)
    for el in results:
        if el != "ZERO":
            return_val = 1
    return return_val

def do_work(server, client, server_args, client_args):
    print "in do_work"
    server_output = client_output = ""
    child_pid = os.fork()
    if child_pid == 0:
        server_output = func_to_run(server, server_args)
        sys.exit(server_output)
    time.sleep(1)

    client_output = func_to_run(client, client_args)
    # kill and wait for server to finish
    os.kill(child_pid, signal.SIGTERM)
    (pid, status) = os.waitpid(child_pid, 0)

    return (server_output == "done" and client_output =="done")



def run_server_client(server, client, server_args, client_args, w, mq):
    local_queue = multiprocessing.Queue()
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
    print "Starting server process"
    server_process.start()
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
    print "Starting client process"
    client_process.start()
    print "joining processes"
    server_process.join()
    client_process.join()
    print "processes joined and done"
    if handle_queue(local_queue) == 0:
        mq.put("ZERO")

def run_in_parallel(server, client):
    #set up commands for first process
    master_queue = multiprocessing.Queue()
    server_cmd = "server command for run_server_client"
    client_cmd = "client command for run_server_client"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
    print "Starting process one"
    process_one.start()
    #set up second process to run - but this one can run here
    print "About to do work"
    #result = do_work(server, client, "server args from do work", "client args from do work")
    run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
    print "Joining process one"
    process_one.join()
    #use outputs above and the result to determine result
    print "Process one has joined"
    return_val = handle_queue(master_queue)
    print return_val
    return return_val

def main():
    #grab client
    client = Client()
    #grab server
    server = Server()
    val = run_in_parallel(server, client)
    if val:
        print "failed"
    else:
        print "passed"
    return val

if __name__ == "__main__":
    main()

这段代码有一些微调的打印输出,只是为了查看发生了什么。我使用了一个多进程队列来存储和共享进程间的输出,并将其传回到我的主线程中进行处理。我认为这解决了我问题中的Python部分,但我正在处理的代码仍然存在一些问题。我唯一能说的就是与 func_to_run 相当的事情涉及通过SSH发送命令并抓取任何错误以及输出。由于某种原因,对于执行时间较短的命令,这个方法非常完美地工作,但对于执行时间/输出较长的命令则不行。我尝试使用这里代码中截然不同的工作值模拟这个问题,但没有能够复制类似的结果。

编辑3 我正在使用的库代码(同样不是我的)使用了 Popen.wait() 来执行 SSH 命令,我刚读到这个:

  

Popen.wait()   等待子进程终止。设置并返回 returncode 属性。

     

警告当使用 stdout=PIPE 和/或 stderr=PIPE 并且子进程生成足够的输出到管道,以至于它阻塞等待 OS 管道缓冲区接受更多数据时,这将导致死锁。使用 communicate() 来避免。

我调整了代码,不再缓冲,而是按接收到的样子直接打印,一切正常工作。


2
你有多个问题。首先:当你使用multiprocessing模块时,output.append()可能不会按照你的意愿执行。其次,你报告的问题如下:你不能在当前执行的进程中调用未被拥有的Process上的.join()方法。你承认你的例子是人工合成的,所以很难确定问题出在哪里。你确定你像这个例子一样将Process()的结果分配给一个短暂的局部变量吗?或者你是在使用全局或实例变量(例如self.process_one)? - Brian Cain
对于第一个问题 - 你是对的。当我使用线程时它可以工作,但可能是因为共享内存空间。进程的变量是本地的。这些进程不是由调用进程拥有吗?我只在创建它们的函数中加入进程,所以我会假设所有权没问题,除非有一些调度问题。 - Ian Panzica
啊!我现在明白了,你正在一个atexit处理程序中调用join,但只有当multiprocessing试图清理自己时才会这样做。我猜测这是将Process实例传递给另一个进程的结果。如果是这种情况,那么在CPython中,这是一个微妙的错误。Process应该拒绝被pickle并在尝试传递它时给出异常。我知道创建一个最小的复现者不容易,但我认为你会发现这值得你的时间。它将帮助您和我们识别错误的关键元素。 - Brian Cain
嗨@BrianCain-我已经更新了问题本身,并附上了一些更多的想法。抱歉之前遗漏了os.fork()部分。我知道它很重要,只是没想到它会有所不同。 - Ian Panzica
3个回答

5
我可以更改旧代码以不使用os.fork(),但我也想知道为什么会导致这个问题,是否有可行的解决方案。理解问题的关键是确切地知道fork()的作用。CPython文档说明了“创建一个子进程”,但这假定您了解C库调用fork()。Glibc的手册如下所示: “fork()通过复制调用进程来创建新进程。新进程称为子进程,是调用进程的精确副本,称为父进程,除了以下几点:...”基本上就像您拿出程序并复制其程序状态(堆栈,指令指针等),稍微有些不同,并让它独立于原始程序执行。当此子进程自然退出时,它将使用exit(),这将触发multiprocessing模块注册的atexit()处理程序。你能做些什么来避免它?
  • 省略 os.fork(): 使用multiprocessing替代,就像您现在正在探索的那样
  • 可能有效: 在执行fork()后,只在子进程或父进程中根据需要导入multiprocessing
  • 在子进程中使用 _exit() (CPython文档指出,“注意退出的标准方法是sys.exit(n)。_exit()通常仅在fork()之后的子进程中使用。”)

https://docs.python.org/2/library/os.html#os._exit


我会接受这个答案。非常感谢你的帮助!我最后一次编辑了问题,并加入了一些注释和我已经开始实现的第一个建议。 - Ian Panzica

2

除了Cain提供的优秀解决方案,如果你面临和我一样无法控制子进程创建的情况,你可以尝试在你的子进程中注销atexit函数以消除这些消息:

import atexit
from multiprocessing.util import _exit_function

atexit.unregister(_exit_function)

注意: 这可能导致泄漏。例如,如果您的子进程有自己的子进程,它们将不会被清除。因此,请澄清您的情况并在之后进行彻底的测试。


0

我觉得你多次进行了线程处理。我不会从run_in_parallel中进行线程处理,而是直接使用正确的参数调用run_server_client,因为它们会在内部进行线程处理。


但是这样不会一直阻塞吗?我需要同时运行run_server_clientdo_work,这就是为什么我在这里创建了一个单独的进程。 - Ian Panzica

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接