Python可迭代队列

12
我需要知道什么时候队列已经关闭并且不会再有更多的项目,以便我可以结束迭代。
我通过在队列中放置一个哨兵来实现:
from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

鉴于这是队列的一个非常常见的用法,难道没有任何内置实现吗?


我要么使用哨兵,要么在线程中使用标志来停止对队列的迭代。对于后者,我通常会等待一段时间。 - jdi
3个回答

15
一个标记是生产者发送消息表示没有更多的队列任务的合理方式。
顺便提一句,使用 iter() 的两个参数形式可以大大简化您的代码。
from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

4

多进程模块有自己的版本Queue,其中包含close方法。我不确定它在线程中如何工作,但值得一试。我不明白为什么它不能起到相同的作用:

from multiprocessing import Queue

q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()

你可以捕获 IOError 作为关闭信号。

测试

from multiprocessing import Queue
from threading import Thread

def worker(q):
    while True:
        try:
            item = q.get(timeout=.5)
        except IOError:
            print "Queue closed. Exiting thread."
            return
        except:
            continue
        print "Got item:", item

q = Queue()
for i in xrange(3):
    q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.

说实话,这与在Queue.Queue上设置标志没有太大区别。multiprocessing.Queue只是将关闭的文件描述符用作标记:

from Queue import Queue

def worker2(q):
    while True:
        if q.closed:
            print "Queue closed. Exiting thread."
            return
        try:
            item = q.get(timeout=.5)
        except:
            continue
        print "Got item:", item

q = Queue()
q.closed = False
for i in xrange(3):
    q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.

0
一个老问题,self._sentinel = Object() 的变体可以解决。在2021年重新审视它时,我建议使用concurrent.futures并将None作为您的标记:
# Note: this is Python 3.8+ code                                                                                                                                                   

import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor

def worker(tup):
    (q,i) = tup
    print(f"Starting thread {i}")
    partial_sum = 0
    numbers_added = 0
    while True:
        try:
            item = q.get()
            if item is None:
                # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                q.put(None)
                break
            numbers_added += 1
            partial_sum += item
            # need to pretend that we're doing something asynchronous                                                                                                              
            time.sleep(random.random()/100)

    except Exception as e:
            print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
            break

    print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
    return partial_sum

MAX_RANGE = 1024
MAX_THREADS = 12

with ThreadPoolExecutor() as executor:

    # create a queue with numbers to add up                                                                                                                                        
    (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))

    # kick off the threads                                                                                                                                                         
    future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])

    # they'll be done more or less instantly, but we'll make them wait                                                                                                             
    print("Threads launched with first batch ... sleeping 2 seconds")
    time.sleep(2)

    # threads are still available for more work!                                                                                                                                   
    for i in range(MAX_RANGE):
        q.put(i)

    print("Finished giving them another batch, this time we're not sleeping")

    # now we tell them all to wrap it up                                                                                                                                           
    q.put(None)
    # this will nicely catch the outputs                                                                                                                                           
    sum = functools.reduce(lambda x, y: x+y, future_partials)
    print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")

# Starting thread 0                                                                                                                                                                
# Starting thread 1                                                                                                                                                                
# Starting thread 2                                                                                                                                                                
# Starting thread 3                                                                                                                                                                
# Starting thread 4                                                                                                                                                                
# Starting thread 5                                                                                                                                                                
# Starting thread 6                                                                                                                                                                
# Starting thread 7                                                                                                                                                                
# Starting thread 8                                                                                                                                                                
# Starting thread 9                                                                                                                                                                
# Starting thread 10                                                                                                                                                               
# Starting thread 11                                                                                                                                                               
# Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
# Finished giving them another batch, this time we're not sleeping                                                                                                                 
# Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
# Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
# Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
# Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
# Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
# Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
# Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
# Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
# Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
# Got total sum 1047552 (correct answer is 1047552      

                                                                                                                       

请注意,事实上的“主线程”只需要将 None 推入队列中,类似于条件变量的“信号”,所有线程都会接收到并传播它。
另外,这不使用比标准(线程安全)队列更重的多处理器 Queue。上述代码还有一个好处,就是可以轻松地修改为使用 ProcessPoolExecutor 或两者的混合体(在任何情况下,您都需要使用 multiprocessing.Queue)。
(副注:一般来说,如果在任何给定的 Python 生成中需要使用类来解决“基本”问题,则通常在更新的版本中有新选项。)
(第二个侧面说明:代码之所以是 Python 3.8+ 是因为我喜欢 赋值表达式,符合以上侧面说明的规定,解决了如何从列表初始化队列的历史性问题,而无需采用非功能性解决方案。)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接