确定线程池何时完成处理队列

3
我正在尝试使用ThreadPoolQueue实现一个处理任务队列的线程池。它从一个初始任务队列开始,然后每个任务都可以将其他任务推送到任务队列中。问题是我不知道如何在阻塞队列直到队列为空并且线程池完成处理的同时,检查队列并提交任何新任务到线程池中。我不能简单地调用ThreadPool.join(),因为我需要保持线程池打开以接受新任务。
例如:
from multiprocessing.pool import ThreadPool
from Queue import Queue
from random import random
import time
import threading

queue = Queue()
pool = ThreadPool()
stdout_lock = threading.Lock()

def foobar_task():
    with stdout_lock: print "task called" 
    if random() > .25:
        with stdout_lock: print "task appended to queue"
        queue.append(foobar_task)
    time.sleep(1)

# set up initial queue
for n in range(5):
    queue.put(foobar_task)

# run the thread pool
while not queue.empty():
    task = queue.get() 
    pool.apply_async(task)

with stdout_lock: print "pool is closed"
pool.close()
pool.join()

这将输出:
pool is closed
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue

在foobar_tasks被添加到队列之前,该代码就退出了while循环,因此这些已添加的任务从未被提交到线程池。我找不到任何方法来确定线程池是否仍有活动的工作线程。我尝试了以下方法:

while not queue.empty() or any(worker.is_alive() for worker in pool._pool):
    if not queue.empty():
        task = queue.get() 
        pool.apply_async(task)
    else:   
        with stdout_lock: print "waiting for worker threads to complete..."
        time.sleep(1)

但似乎worker.is_alive()总是返回true,因此会进入无限循环。

有没有更好的方法?

1个回答

2
  1. 每处理完一个任务后,调用queue.task_done
  2. 然后可以调用queue.join()阻塞主线程,直到所有任务完成。
  3. 要终止工作线程,在队列中放置一个标志(例如None),并在接收到标志时使foobar_task退出while-loop
  4. 我认为使用threading.Thread比使用ThreadPool更容易实现。
import random
import time
import threading
import logging
import Queue

logger=logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

sentinel=None
queue = Queue.Queue()
num_threads = 5

def foobar_task(queue):
    while True:
        n = queue.get()
        logger.info('task called: {n}'.format(n=n))
        if n is sentinel: break
        n=random.random()
        if n > .25:
            logger.info("task appended to queue")
            queue.put(n)
        queue.task_done()

# set up initial queue
for i in range(num_threads):
    queue.put(i)

threads=[threading.Thread(target=foobar_task,args=(queue,))
         for n in range(num_threads)]
for t in threads:
    t.start()

queue.join()
for i in range(num_threads):
    queue.put(sentinel)

for t in threads:
    t.join()
logger.info("threads are closed")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接