Python多进程队列get()超时尽管队列已满

14

我正在使用Python的multiprocessing模块进行科学并行处理。在我的代码中,我使用几个工作进程来执行重要任务,以及一个写入进程将结果保存到磁盘。要写入的数据通过队列从工作进程发送到写入进程。数据本身非常简单,仅包含一个包含文件名和两个浮点数列表的元组。经过数小时的处理后,写入进程经常会卡住。更具体地说,以下代码段:

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

循环将永远不会退出,我会持续收到“超时”消息。

此外,我还实现了一个日志记录过程,其中输出了队列的状态等信息,尽管我会收到上面的超时错误消息,但对qsize()函数的调用始终返回完整的队列(在我的情况下为48)。

我已经彻底检查了队列对象的文档,并找不到可能解释为什么get()函数同时返回超时和队列已满的情况的可能性。

有任何想法吗?

编辑:

我修改了代码以确保我捕获了空队列异常:

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Empty as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

1
我有完全相同的问题。但是,我通过在超时后重试.get(),如果队列仍然是满的,就绕过了这个问题,通常会起作用,但这并不是真正的解决方案。你找到解决方案了吗? - Jaka
只是确认一下:我理解你正在使用multiprocessing.Queue而不是来自其他模块的Queue。对吗? - azelcer
  1. 你为什么确定队列中有消息?
  2. 为什么要使用带超时的 get 而不是阻塞式的 get?这个写入进程可能是一个守护进程,当主进程结束时它也会结束,所以在 get 上阻塞不会导致程序无法终止。或者你可以发送一个特殊的“哨兵”消息,比如 None,表示没有更多的消息了,如果你不想使用守护进程,它应该返回。
- Booboo
3个回答

3

3
在多进程中,队列被用作同步消息队列。这在您的问题中似乎也是如此。然而,这需要更多的操作,不仅仅是调用get()方法。在每个任务处理完成后,需要调用task_done()方法,以便将元素从队列中删除。
根据文档:

Queue.task_done()

表示已完成先前排队的任务。由队列消费者线程使用。对于每个用于获取任务的get()调用,随后的task_done()调用告诉队列该任务的处理已完成。

如果join()当前正在阻塞,则当所有项目都已处理(即接收到放入队列中的每个项目的task_done()调用)时,它将恢复。

在文档中,您还会找到正确使用线程队列的代码示例。
在您的代码中,应该像这样:
while (True):
    try:
        item = queue.get(timeout=60)
        if item is None:
            break
        # call working fuction here
        queue.task_done()
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

1
谢谢您的回复,Tomasz。然而,我怀疑task_done()并不是解决问题的方法。根据文档,task_done()仅适用于可连接队列(multiprocessing.JoinableQueue),而我没有使用它。此外,我在互联网上找到的几乎所有示例都没有使用task_done方法。 - AndreJohannes
Queue.get() 方法会移除并返回队列中的元素。您可以查看这个链接 python-queue-get-task-done-question。这可能是正确答案,但是顶部段落的解释是误导性的。 - Pavan
好的,但是我使用 multiprocessing.Queue 而不是 multiprocessing.JoinableQueue,只有后者提供 task_done() 方法。我也认为文档清楚地解释了 task_done() 方法用于让队列知道任务何时完成,以便 join() 可以解锁。这是我没有使用的一个功能。 - AndreJohannes
是的,我承认我通常在线程上下文中使用它,而不是多进程上下文中。我现在一直在查看cpython源代码,以找到可能导致您遇到问题的队列和multiprocessing.queue之间的确切差异。另外,使用普通的Queue.get(),项目不会被删除,直到被清除为止。qsize()仅返回近似大小,不计算正在处理的元素,因此理论上它可能会给出空队列,同时由于底层缓冲区已满而仍然阻塞。 - Tomasz Plaskota
Tomasz,我刚在回复noxdafox的评论中写道,我现在将重新运行我的代码并确保我获得准确的错误信息。不过这需要几个小时... - AndreJohannes
显示剩余2条评论

-1

你捕获了一个过于通用的 Exception 并假设它是一个超时错误。

尝试按照以下方式修改逻辑:

from Queue import Empty

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Empty as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
        print(queue.qsize())

并查看是否仍然打印日志行。


好的,我承认我不能确定这是一个超时错误。现在我将重新运行它,并接受Queue.Empty错误。然而,如果我查看我的日志文件,我会每60秒收到“Writer: Timout occurred..”消息,这符合指定的超时时间。 - AndreJohannes
noxdafox,我已经相应地修改了代码。我肯定会抛出一个空队列错误。每60秒发生一次,而记录器进程指示队列已满。 - AndreJohannes
我稍微修改了代码,请尝试在异常后添加那行代码,并验证队列中是否确实有元素。还有一件事:队列中的这些元素有多大?是Kb?Mb?Gb? - noxdafox
队列中元素的大小实际上相当小。它们具有以下结构:(文件名,[浮点数,浮点数]),其中文件名通常不超过20个字符。我总共处理约170000个项目,队列通常在发送到输出队列的50000到80000个元素后会卡住。 好的,我会添加这行代码,尽管我的记录器进程不断监视队列大小并声称它仍然是满的。 - AndreJohannes
刚刚获取了最新运行的日志。我同时收到了一个"Writer: Timeout occured"和一个"Writer: queue size 48"的消息。不确定如何解释这个问题,但我认为它可能是队列实现内部的一个问题,尽管我觉得很奇怪之前没有人遇到过这个问题。 - AndreJohannes
这确实很奇怪。似乎当队列中有48个元素时,它总是会卡住。假设您的输入相同(相同的元素和顺序),可能是第48个元素有问题。我建议您记录一下放入“Queue”中的元素,以便您可以找出有问题的元素。另一种方法是尝试用“Pipe”替换“Queue”,看看是否得到相同的结果。 - noxdafox

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接