我试图编写一个类,使用多个进程来计算校验和,从而利用多个内核。我有一个非常简单的类可以做到这一点,在执行简单案例时效果很好。但是,每当我创建两个或更多的类实例时,工作进程就永远不会退出。似乎它从未收到父进程关闭管道的消息。
下面是所有的代码。我首先单独计算md5和sha1校验和,这样可以正常运行,然后我尝试并行计算,但是当关闭管道时,程序锁定了。
在这里发生了什么?为什么管道没有按照我的预期工作?我想我可以通过在队列上发送“停止”消息并使子进程以此退出来解决这个问题,但是我真的很想知道为什么这不起作用。
下面是所有的代码。我首先单独计算md5和sha1校验和,这样可以正常运行,然后我尝试并行计算,但是当关闭管道时,程序锁定了。
在这里发生了什么?为什么管道没有按照我的预期工作?我想我可以通过在队列上发送“停止”消息并使子进程以此退出来解决这个问题,但是我真的很想知道为什么这不起作用。
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
PS. 问题已解决 如果有人感兴趣,这是上面代码的可工作版本:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
self.parent_conn.close()
后添加ChecksumPipe.all_open_parent_conns.remove(self.parent_conn)
,以便销毁连接。 - Maxim Egorushkinself.summer = eval("hashlib.%s()" % csname)
看起来不太美观。那么self.summer = getattr(hashlib, csname)()
呢? - glglgl