为什么Python的多进程管道不安全?

11

当有多个发送器和接收器时,我不明白为什么说Pipes是不安全的。

如果是这种情况,以下代码如何转换为使用Queues的代码?Queues在关闭时不会抛出EOFError异常,因此我的进程无法停止。我应该无限发送“毒药”消息来告诉它们停止(这样,我确信所有进程至少接收到一条毒药消息)吗?

我想保持管道p1的打开状态,直到我决定关闭它(在这里是发送了10条消息)。


from multiprocessing import Pipe, Process
from random import randint, random
from time import sleep

def job(name, p_in, p_out):
    print(name + ' starting')
    nb_msg = 0
    try:
        while True:
            x = p_in.recv()
            print(name + ' receives ' + x)
            nb_msg = nb_msg + 1
            p_out.send(x)
            sleep(random())
    except EOFError:
        pass
    print(name + ' ending ... ' + str(nb_msg) + ' message(s)')

if __name__ == '__main__':
    p1_in, p1_out = Pipe()
    p2_in, p2_out = Pipe()

    proc = []

    for i in range(3):
        p = Process(target=job, args=(str(i), p1_out, p2_in))
        p.start()
        proc.append(p)

    for x in range(10):
        p1_in.send(chr(97+x))
    p1_in.close()
    for p in proc:
        p.join()
    p1_out.close()
    p2_in.close()

    try:
        while True:
            print(p2_out.recv())
    except EOFError:
        pass

    p2_out.close()
2个回答

17

问题实质上是Pipe只是对于平台定义的管道对象的简单包装。 recv只是简单地重复接收字节缓冲区,直到获取完整的Python对象。如果两个线程或进程在同一个管道上使用recv,则读取可能会交错,使得每个进程都拥有半个已pickle对象,从而破坏数据。Queue在进程之间执行适当的同步,但这需要更复杂的操作。

正如multiprocessing文档所说:

请注意,如果两个进程(或线程)尝试同时从管道的同一端读取或写入数据,则管道中的数据可能会损坏。当然,如果使用管道的不同端点的进程同时工作,则不存在损坏风险。

您无需无限发送毒丸;每个工作进程只需要一个毒丸即可。每个工作进程在退出之前仅接收一条毒丸消息,因此不存在工作进程会遗漏消息的危险。

您还应该考虑使用multiprocessing.Pool而不是重新实现“工作进程”模型-Pool具有许多方法,使跨多个线程分发工作变得非常容易。


如果我在使用管道的recvsend时使用multiprocessing.Lock(),那么它会变得安全(且高效)吗? - Daniel
3
如果这样做,你基本上会得到一个“队列”——multiprocessing.Queue是带有一对锁定的“管道”(每个方向一个)。 因此,它是安全且相当有效的,但你也将直接重新发明轮子——为什么不直接使用Queue呢? - nneonneo

8

我不明白为什么当有多个发送者和接收者时,管道被认为是不安全的。

考虑一下,你同时从源A和B将水放入管道中。在管道的另一端,你无法确定哪部分水来自A或B,对吧? :)

管道在字节级别上传输数据流。如果没有在其上层的通信协议,它就不知道什么是消息,因此无法确保消息完整性。因此,不仅使用具有多个发送者的管道“不安全”,而且这是一个主要的设计缺陷,很可能会导致通信问题。

然而,队列是在更高的层次上实现的。它们被设计用于通信消息(甚至抽象对象)。队列是为了保持消息/对象的自包含性而设计的。多个来源可以将对象放入队列中,多个消费者可以拉取这些对象,而可以100%地确保任何作为单元进入队列的东西也作为单元出队列。

经过相当长时间的编辑:

我应该补充说,在字节流中,所有字节按发送顺序检索(保证)。多个发送者的问题在于,发送顺序(输入顺序)可能已经不清楚或随机,即多个流可能以不可预测的方式混合。

常见的队列实现保证单个消息保持完整,即使有多个发送者。消息也按照发送顺序检索。然而,在具有多个竞争发送者且没有进一步同步机制的情况下,再次无法保证输入消息的顺序。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接