使用Python多进程管道

13
我试图编写一个类,使用多个进程来计算校验和,从而利用多个内核。我有一个非常简单的类可以做到这一点,在执行简单案例时效果很好。但是,每当我创建两个或更多的类实例时,工作进程就永远不会退出。似乎它从未收到父进程关闭管道的消息。
下面是所有的代码。我首先单独计算md5和sha1校验和,这样可以正常运行,然后我尝试并行计算,但是当关闭管道时,程序锁定了。
在这里发生了什么?为什么管道没有按照我的预期工作?我想我可以通过在队列上发送“停止”消息并使子进程以此退出来解决这个问题,但是我真的很想知道为什么这不起作用。
import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

PS. 问题已解决 如果有人感兴趣,这是上面代码的可工作版本:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()

你可以在 self.parent_conn.close() 后添加 ChecksumPipe.all_open_parent_conns.remove(self.parent_conn),以便销毁连接。 - Maxim Egorushkin
self.summer = eval("hashlib.%s()" % csname) 看起来不太美观。那么 self.summer = getattr(hashlib, csname)() 呢? - glglgl
1个回答

7

是的,这确实是令人惊讶的行为。

然而,如果您查看两个并行子进程的lsof输出,很容易注意到第二个子进程打开了更多文件描述符。

发生的情况是,当启动两个并行子进程时,第二个子进程继承了父进程的管道,因此当父进程调用self.parent_conn.close()时,第二个子进程仍具有该管道文件描述符的打开状态,从而使得内核中的该管道文件描述符未被关闭(引用计数大于0),其结果是在第一个并行子进程中,self.child_conn.recv_bytes()从未执行read()以及永远不会抛出EOFError

您可能需要发送一个显式的关闭消息,而不仅仅是关闭文件描述符,因为似乎很难控制哪些进程之间共享哪些文件描述符(没有close-on-fork文件描述符标志)。


谢谢!这为我解决了问题。我在我的示例中通过使用一个共享类变量来包含所有实例中的所有打开连接,以便子进程可以关闭它们不需要的所有套接字。 - Mats Ekberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接