什么时候需要使用 Queue.join()?

4

Python 3文档给出了一个使用队列的工作线程示例(https://docs.python.org/3/library/queue.html):

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

在这个例子中,为什么需要使用 q.join()?难道后续的 q.put(None)t.join() 操作不能达到同样的效果,即阻塞主线程直到工作线程完成吗?
1个回答

5
这是我对示例的理解:
每个工作者都会无限循环,始终在等待队列中的新任务。如果它得到的项目是 None,那么它会退出并将控制权还给主程序。
所以,首先我们让程序等待队列为空。每次调用 q.task_done() 都表示一个新的任务已完成。程序挂起在下面这行代码,因此我们确保每个任务都被标记为已完成。
# block until all tasks are done
q.join()

接下来,在下方,我们将添加与工作人员数量相同数量的None项到队列中(以确保每个工作人员都会得到一项任务)。

for i in range(num_worker_threads):
    q.put(None)

接下来,我们将所有线程加入。由于我们通过队列给每个工作线程提供了一个None项目,它们都会停止。在它们全部停止并返回控制之前,我们希望在这里等待。
for t in threads:
    t.join()

通过这种方式,我们确保队列中的每个项目都得到处理,每个工作进程在队列为空时都会停止,每个工作进程在我们继续代码之前都会关闭,有助于避免孤儿进程。

这基本上也是我对这个例子的理解。所以,我的问题是,即使我们删除 q.join() 语句,队列中的每个项目都会被处理(并且所有线程都会被清理)吗?通过加入所有工作线程(最后一步),我们仍然在等待处理队列中的所有项目。 - SMX
2
是的,我们仍在等待,但我认为这是一种编码安全措施。如果在处理队列中的所有项目之前神奇地添加了None,并且一个工作人员得到了它,那会怎么样?我认为这只是一个繁琐的编程示例,以确保您理解正在发生的事情,而无需完全理解队列按FIFO(先进先出)进行。此外,这个示例可以适用于LIFO(后进先出)实现。但是,对于当前的示例,我认为您可以删除q.join(),它也可以正常运行。 - jarcobi889

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接