Python - queue.task_done()用于什么?

55

我写了一个脚本,其中有多个线程(使用 threading.Thread 创建),从一个 Queue 中获取 URL 并使用 queue.get_nowait() 进行处理 HTML。我是多线程编程的新手,对于 queue.task_done() 函数的目的感到困惑。

Queue 为空时,它会自动返回 queue.Empty 异常。因此,我不理解为什么每个线程都需要调用 task_done() 函数。我们知道队列为空时就完成了,那么为什么要通知它工作线程已经完成了它们的工作(得到了来自队列的 URL 后与队列无关的工作)?

能否提供一个代码示例(最好使用 urllib、文件 I/O 或其他内容,而不是斐波那契数列和打印“Hello”),展示这个函数在实际应用中如何使用?

4个回答

85

Queue.task_done并不是为了工作线程的利益而存在,而是为了支持Queue.join


假设我给你一箱工作任务,我在意的不是你什么时候把箱子里的所有东西拿出来。

我在意的是工作完成的时间。看一个空箱子并不能告诉我工作是否完成。你和其他5个人可能还在忙着处理从箱子里取出的任务。

Queue.task_done允许工作线程声明一个任务已经完成。等待所有工作完成的人使用Queue.join将会等到足够数量的task_done调用被发出,而不是等到队列为空。


eigenfield在评论中指出,队列具有task_done/join方法似乎非常奇怪。这是真的,但实际上是一种命名问题。queue模块选择了糟糕的名称,使其听起来像一个通用队列库,而实际上它只是一个线程通信库。

对于一个通用队列来说,拥有task_done/join方法确实很奇怪,但对于一个线程间的消息通道来说,有一种指示消息已被处理的方法是完全合理的。如果这个类被命名为thread_communication.MessageChannel而不是queue.Queuetask_done被称为message_processed,则意图将变得更加明确。

(如果你需要一个通用队列而不是线程间消息通道,请使用collections.deque。)


2
是的,但我想知道的是为什么队列需要知道任务(即处理HTML、将提取的数据插入到数据库中等)何时完成?当它为空时,我就完成了。为什么我不能只在抛出queue.Empty异常时知道我已经完成了呢?为什么URL队列需要知道我成功地将某些内容输入到数据库中或处理一些HTML呢?队列应该关心的只是所有URL是否已被分派到某个线程。 - J. Taylor
2
@J.Taylor:调用Queue.join的人需要知道。 - user2357112
2
@J.Taylor: 取决于谁需要知道工作是否完成,以及是否在执行工作的同时添加了新任务。* 工作者们 *不需要调用 task_done 来知道他们是否完成了工作。task_done 是一种向任何等待工作完成的线程传达信息的方式。 - user2357112
13
对我来说,队列暗示着任务完成/加入语义的概念是不寻常的。队列只应关注放入和获取操作以及是否为空或已满,不应该涉及任务完成相关的内容。难道我在异次元世界中吗? - daparic
3
这句话的意思是:在常见的任务分配场景中,使用队列既方便又实用。但这仅适用于需要使用它的情况。如果您从不计划在队列上调用 join(),那么您也不需要知道或关心 task_done() - user4815162342
显示剩余4条评论

22

.task_done() 用于标记处理完成,以使 .join() 知道处理已完成。

如果您使用 .join(),并且未针对每个处理的项目调用 .task_done(),则脚本将永远挂起。


没有什么比一个简短的示例更好的了;

import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = False


def items_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item is None:
                continue

            try:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            pass
        except:
            logging.exception('error while processing item')


def process_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))


if __name__ == '__main__':
    running = True

    # Create 10 items_queue_worker threads
    worker_threads = 10
    for _ in range(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with data
    for i in range(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False

2
为什么需要额外的测试“if item is None:”,当你永远不会到达它(除非你实际上在items_queue中存储None值,但现在你没有这样做)?就我所看到的,如果队列为空,则会抛出Empty异常,因此执行将“跳转”到catch子句。 - Alex
你的"If you use .join() and don't call .task_done() for every processed item, your script will hang forever."这句话是从其他地方引用的吗?如果是,请提供源链接。否则,能否提供一些信息说明这只是你(很好的)回答的要点。 - bugmenot123

18

“阅读源代码,卢克!”——欧比旺·科多比

asyncio.queue的源码非常简短。

  • 将任务加入队列时,未完成任务的数量会增加一。
  • 调用task_done时,未完成任务的数量会减少一。
  • join()等待没有未完成任务。

只有在调用task_done()时,join()才有用。使用经典的银行类比:

  • 人们从门口进来排队;门是生产者执行q.put()
  • 当柜员闲置且有人排队时,他们到柜员窗口。柜员执行q.get()
  • 当柜员完成帮助一个人后,他们准备好为下一个人服务。柜员执行q.task_done()
  • 下午5点,门被锁了。门的任务完成
  • 您等待直到队伍为空并且每个柜员都已经为他们面前的人完成了服务。 await q.join(tellers)
  • 然后你让柜员回家,他们现在都是空闲的,而且队列也是空的。 for teller in tellers: teller.cancel()

如果没有task_done(),您无法知道每个柜员是否已经为他们的客人完成了服务。在柜员有顾客时,您不能让柜员回家。


优秀的例子。谢谢! - Jabba

5

有人能够提供一个代码示例(最好使用urllib,文件I/O或其他内容而不是斐波那契数列和打印“Hello”),向我展示如何在实际应用中使用此函数吗?

@user2357112的答案很好地解释了task_done的目的,但缺少所请求的示例。这是一个计算任意数量文件校验和并返回将每个文件名映射到相应校验和的字典的函数。在函数内部,工作被分配给多个线程。

该函数使用Queue.join等待工作者完成其分配的任务,因此安全地将字典返回给调用者。这是一种方便的方式,以等待所有文件被处理,而不仅仅是出队。

import threading, queue, hashlib

def _work(q, checksums):
    while True:
        filename = q.get()
        if filename is None:
            q.put(None)
            break
        try:
            sha = hashlib.sha256()
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

def calc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i in range(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exit
    return checksums

关于GIL的说明:由于hashlib中的代码在计算校验和时内部释放了GIL,因此使用多个线程与单线程相比可以获得可衡量的速度提升(取决于Python版本,为1.75倍至2倍)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接