我有一个子进程,使用 multiprocessing.Process
,和一个队列,使用 multiprocessing.Queue
。
主进程正在使用 multiprocessing.Queue.get()
获取一些新数据。我不想在那里设置超时,并且希望它是阻塞的。
但是,当子进程由于任何原因(手动由用户通过 kill
杀死或发生段错误等)停止运行时,Queue.get()
将永远挂起。
我该怎么避免这种情况?
我有一个子进程,使用 multiprocessing.Process
,和一个队列,使用 multiprocessing.Queue
。
主进程正在使用 multiprocessing.Queue.get()
获取一些新数据。我不想在那里设置超时,并且希望它是阻塞的。
但是,当子进程由于任何原因(手动由用户通过 kill
杀死或发生段错误等)停止运行时,Queue.get()
将永远挂起。
我该怎么避免这种情况?
multiprocessing.Queue
不是我想要的。parent_conn, child_conn = multiprocessing.Pipe(duplex=True)
要获得两个 multiprocessing.Connection
对象。然后我使用 os.fork()
或者使用 multiprocessing.Process
。在子进程中,我执行以下操作:
parent_conn.close()
# read/write on child_conn
child_conn.close()
# read/write on parent_conn
这样,当我在连接上调用recv()
时,如果子进程/父进程在此期间停止运行,它将引发异常(EOFError
)。
请注意,这仅适用于单个子进程。如果您想要多个子进程,则可能需要使用Queue
。在这种情况下,您可能会有一些管理器来监视所有子进程是否存活并相应地重新启动它们。
Queue
不知道何时不再有可用的写入者。您可以将对象传递给任意数量的子进程,但它不知道您是否将其传递给了任何给定的子进程。因此,即使某个子进程死亡,它仍然必须等待。队列不是在子进程死亡时自动关闭的文件描述符。SIGCHLD
进程、检查 Process.is_alive
或在线程中使用 Process.join
来实现这一点。简单的实现将在 Queue.get
调用中使用 timeout
参数,并在返回时进行 Process.is_alive
检查。None
或某种标记),以便您的父进程可以正确处理它。kill -9
、段错误或类似情况。timeout也不是真正的解决方案,因为我不想让它完全挂起——没有理由无缘无故地挂起。我可以使用SIGCHLD,但那似乎有点过度杀伤力。后台线程似乎也浪费了一些资源,只是为了做一些更简单的事情。 - Albert