当有多个发送器和接收器时,我不明白为什么说Pipes
是不安全的。
如果是这种情况,以下代码如何转换为使用Queues
的代码?Queues
在关闭时不会抛出EOFError
异常,因此我的进程无法停止。我应该无限发送“毒药”消息来告诉它们停止(这样,我确信所有进程至少接收到一条毒药消息)吗?
我想保持管道p1
的打开状态,直到我决定关闭它(在这里是发送了10条消息)。
from multiprocessing import Pipe, Process
from random import randint, random
from time import sleep
def job(name, p_in, p_out):
print(name + ' starting')
nb_msg = 0
try:
while True:
x = p_in.recv()
print(name + ' receives ' + x)
nb_msg = nb_msg + 1
p_out.send(x)
sleep(random())
except EOFError:
pass
print(name + ' ending ... ' + str(nb_msg) + ' message(s)')
if __name__ == '__main__':
p1_in, p1_out = Pipe()
p2_in, p2_out = Pipe()
proc = []
for i in range(3):
p = Process(target=job, args=(str(i), p1_out, p2_in))
p.start()
proc.append(p)
for x in range(10):
p1_in.send(chr(97+x))
p1_in.close()
for p in proc:
p.join()
p1_out.close()
p2_in.close()
try:
while True:
print(p2_out.recv())
except EOFError:
pass
p2_out.close()
recv
和send
时使用multiprocessing.Lock()
,那么它会变得安全(且高效)吗? - Danielmultiprocessing.Queue
是带有一对锁定的“管道”(每个方向一个)。 因此,它是安全且相当有效的,但你也将直接重新发明轮子——为什么不直接使用Queue
呢? - nneonneo