我希望能够在调用未返回的情况下,使用join()方法加入Queue类。最好的方法是什么?是否可以通过子类化queue或使用元类来实现?
继承 Queue
可能是最好的方法。下面的代码应该可以工作(未经测试):
def join_with_timeout(self, timeout):
self.all_tasks_done.acquire()
try:
endtime = time() + timeout
while self.unfinished_tasks:
remaining = endtime - time()
if remaining <= 0.0:
raise NotFinished
self.all_tasks_done.wait(remaining)
finally:
self.all_tasks_done.release()
put
和 get
的 timeout
参数,扩展 join
使用类似的方法也非常容易。 - Lukáš Lalinskýall_tasks_done
没有被记录?这可能意味着该方法在任何版本中都可能被更改/破坏。 - Chris W.join() 方法的作用是等待所有任务完成。如果您不关心任务是否已经完成,可以定期轮询未完成任务的数量:
stop = time() + timeout
while q.unfinished_tasks and time() < stop:
sleep(1)
当任务完成或超时时间到达时,此循环将会结束。
Raymond
当我尝试实现被接受的答案时,似乎all_tasks_done
不再被定义。一个快速的解决方案是在JoinableQueue.join
中调用wait()
函数的timeout
。
例如,在JoinableQueue
子类中覆盖join
函数将在等待操作上添加15秒的超时:
def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait(15)
首先,您应确保队列中的所有工作线程都使用 task_done()
退出。
要使用 Queue
实现超时功能,可以将 Queue 的代码封装在一个线程中,并使用 Thread.join([timeout])
为该线程添加超时。
未经测试的示例以概述我建议的内容
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
def queuefunc():
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.setDaemon(True)
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
t = Thread(target=queuefunc)
t.start()
t.join(100) # timeout applies here
t.join(100)
将为整个作业设置超时。这对于我的用例不起作用,因为我需要在几个小时内填充队列,并在完成加载源后才调用 q.join()
。然后,我应该设置一个更短的超时时间,以捕获那些由于各种原因(包括错误)导致工作人员未能调用 q.get()
足够多次或 q.task_done()
相同次数的情况。 - Elias Hasle