给Python的Queue.join()方法添加超时参数。

19

我希望能够在调用未返回的情况下,使用join()方法加入Queue类。最好的方法是什么?是否可以通过子类化queue或使用元类来实现?


1
你需要确保所有工作线程以task_done()结束。 - tuergeist
4个回答

24

继承 Queue 可能是最好的方法。下面的代码应该可以工作(未经测试):

def join_with_timeout(self, timeout):
    self.all_tasks_done.acquire()
    try:
        endtime = time() + timeout
        while self.unfinished_tasks:
            remaining = endtime - time()
            if remaining <= 0.0:
                raise NotFinished
            self.all_tasks_done.wait(remaining)
    finally:
        self.all_tasks_done.release()

1
谢谢!你从哪里得到关于 all_task_done 的信息?我在 http://docs.python.org/library/queue.html#module-Queue 中查找,但是我没有看到任何提及该成员的内容... - olamundo
4
你可以阅读队列的源代码。它实现了 putgettimeout 参数,扩展 join 使用类似的方法也非常容易。 - Lukáš Lalinský
1
有没有想过为什么all_tasks_done没有被记录?这可能意味着该方法在任何版本中都可能被更改/破坏。 - Chris W.
这个怎么实现?你是调用q.join_with_timeout而不是q.join()吗? - Source Matters

18

join() 方法的作用是等待所有任务完成。如果您不关心任务是否已经完成,可以定期轮询未完成任务的数量:

stop = time() + timeout
while q.unfinished_tasks and time() < stop:
    sleep(1)

当任务完成或超时时间到达时,此循环将会结束。

Raymond


0

当我尝试实现被接受的答案时,似乎all_tasks_done不再被定义。一个快速的解决方案是在JoinableQueue.join中调用wait()函数的timeout

例如,在JoinableQueue子类中覆盖join函数将在等待操作上添加15秒的超时:

def join(self):
    with self._cond:
        if not self._unfinished_tasks._semlock._is_zero():
            self._cond.wait(15)

这个怎么实现的? - free_123
@free_123 哪个部分?答案中的代码应该放在一个新类中,该类继承自JoinableQueue,以定义一种新类型的队列,您应该在代码中使用它。 - peppie

0

首先,您应确保队列中的所有工作线程都使用 task_done() 退出。

要使用 Queue 实现超时功能,可以将 Queue 的代码封装在一个线程中,并使用 Thread.join([timeout]) 为该线程添加超时。

未经测试的示例以概述我建议的内容

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

def queuefunc():
    q = Queue()
    for i in range(num_worker_threads):
        t = Thread(target=worker)
        t.setDaemon(True)
        t.start()

    for item in source():
        q.put(item)

    q.join()       # block until all tasks are done

t = Thread(target=queuefunc)
t.start()
t.join(100) # timeout applies here

t.join(100) 将为整个作业设置超时。这对于我的用例不起作用,因为我需要在几个小时内填充队列,并在完成加载源后才调用 q.join()。然后,我应该设置一个更短的超时时间,以捕获那些由于各种原因(包括错误)导致工作人员未能调用 q.get() 足够多次或 q.task_done() 相同次数的情况。 - Elias Hasle

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接