多进程队列在进程终止时阻塞

4

我有一个子进程,使用 multiprocessing.Process,和一个队列,使用 multiprocessing.Queue

主进程正在使用 multiprocessing.Queue.get() 获取一些新数据。我不想在那里设置超时,并且希望它是阻塞的。

但是,当子进程由于任何原因(手动由用户通过 kill 杀死或发生段错误等)停止运行时,Queue.get() 将永远挂起。

我该怎么避免这种情况?

2个回答

5
我认为multiprocessing.Queue不是我想要的。
我现在正在使用的是:
parent_conn, child_conn = multiprocessing.Pipe(duplex=True)

要获得两个 multiprocessing.Connection 对象。然后我使用 os.fork() 或者使用 multiprocessing.Process。在子进程中,我执行以下操作:

parent_conn.close()
# read/write on child_conn

在父进程(fork之后),我执行以下操作:
child_conn.close()
# read/write on parent_conn

这样,当我在连接上调用recv()时,如果子进程/父进程在此期间停止运行,它将引发异常(EOFError)。

请注意,这仅适用于单个子进程。如果您想要多个子进程,则可能需要使用Queue。在这种情况下,您可能会有一些管理器来监视所有子进程是否存活并相应地重新启动它们。


不错的发现。我在这里回复时尝试使用了“Pipe”,但当客户端死亡时并没有收到EOF错误。我想我忘记关闭连接了。 - Jorgen Schäfer

2
Queue 不知道何时不再有可用的写入者。您可以将对象传递给任意数量的子进程,但它不知道您是否将其传递给了任何给定的子进程。因此,即使某个子进程死亡,它仍然必须等待。队列不是在子进程死亡时自动关闭的文件描述符。
您需要的是一种类似于监督员的东西,在父进程中注意到子进程意外死亡,并以您认为合适的方式处理该情况。您可以通过捕获 SIGCHLD 进程、检查 Process.is_alive 或在线程中使用 Process.join 来实现这一点。简单的实现将在 Queue.get 调用中使用 timeout 参数,并在返回时进行 Process.is_alive 检查。
如果您对子进程的结束有更多控制,则应向队列发送“EOF”类型的对象(None 或某种标记),以便您的父进程可以正确处理它。

1
当然我可以发送EOF,但那永远无法涵盖所有情况,例如用户使用kill -9、段错误或类似情况。timeout也不是真正的解决方案,因为我不想让它完全挂起——没有理由无缘无故地挂起。我可以使用SIGCHLD,但那似乎有点过度杀伤力。后台线程似乎也浪费了一些资源,只是为了做一些更简单的事情。 - Albert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接