从Python的Queue.Queue中删除最旧的项目(带同步)

5

我可以配置Queue.Queue吗,以便它始终接受新项目,并在队列已满时仅删除最旧的项目?

如果不行,标准库中是否有另一个队列类可以实现此功能?

(我不能使用deque,因为我有一个需要同步的生产者/消费者设置。)


3
你可以使用 collections.deque 并传入 maxlen 参数实现相应的功能。 - Moses Koledoye
啊,但我需要它同步。看起来这并不像只是用互斥锁保护那么简单,因为我想在消费者线程上阻塞,直到有一个可用的项目(而不是持续轮询)。 - Alex Flint
我也想到了。我已经重新打开了。 - Moses Koledoye
@spectras 我目前在消费者循环中使用 queue.get(block=True)。在底层,我认为这会等待条件变量。如果要在此处改用互斥锁,我将不得不使用由互斥锁保护的 queue.get(block=False) 进行忙等待,这将消耗更多的 CPU。 - Alex Flint
@ spectras 所以使用“双向队列”思想,我该如何创建一个线程,等待直到有一个可用项目,然后对其进行处理,然后等待直到另一个可用项目,以此类推? - Alex Flint
显示剩余2条评论
3个回答

4

在评论中提到的使用条件保护资源访问的示例。

import collections
import threading
import time

queue = collections.deque()
condition = threading.Condition()

def consumer():
    condition.acquire()
    while True:
        while queue:
            item = queue.popleft()
            condition.release()
            # do something with item
            print(item)
            condition.acquire()
        condition.wait()

def push_item(item):
    with condition:
        queue.append(item)
        condition.notify()

# From that point forward, it is just demonstration code to show how to use

def example_producer_thread(*args):
    for arg in args:
        push_item(arg)

consumer_thread = threading.Thread(target=consumer, name='queue consumer')
consumer_thread.daemon = True  # so it does not prevent python from exiting
consumer_thread.start()

for example in [range(0, 10), range(10, 20), range(20, 30)]:
    threading.Thread(target=example_producer_thread, args=example).start()

time.sleep(1) # let the consumer thread some time before the script gets killed

核心内容如下:
  • consumer() 是消费者线程,它保持空闲状态(不进行轮询),直到其他线程将项目放入队列中。当被唤醒时,它会锁定队列,获取一个项目,解锁队列,处理该项目,直到队列中没有更多项目。然后释放并进入睡眠状态。
  • push_item() 将单个项目推入队列,并通知消费者线程应该唤醒。
其余代码只是为了使其成为可工作的示例。 example_producer_thread 只是将其参数推入队列中。我们启动了其中的三个线程,每个线程在一段数字范围内操作,以便我们可以看到结果。
只需向队列添加maxlen即可正常运行。在此过程中,可以封装该功能于一个小类中。

2

更新:请不要使用此方法。正如@spectras所指出的那样,它不能正确同步。

这并不是特别优雅,但是对于我来说似乎可以用于多个写入者。

class QueueLatest(queue.Queue):
    def put(self, item):
        while True:
            try:
                super().put(item, block = False)
                break
            except queue.Full:
                _ = self.queue.popleft()

1
您正在以非同步的方式访问底层的 self.queue 对象。这将根据 queue.Queue 如何包装它最终导致崩溃。 - spectras
1
_get 是一个内部方法,不应该从外部使用(因为它假定任何同步都已经由调用者处理)。get 使用 threading.Condition 正确地进行同步。 - spectras
1
呵呵,加上评论后它实际上提供了价值。未采取的路径以及为什么不采取它们的详细信息与采取的路径一样有价值。 :) - spectras
@spectras 看起来用 super().get() 替换 self.queue.popleft() 可以解决这个问题。有什么理由不这样做吗? - NichtJens
啊,当然……肯定正确。我认为有获取Queue.mutex的选项,这应该足够了吧?示例 - NichtJens
显示剩余3条评论

0
from queue import Queue, Full

class QueueLatest(Queue):
    ''' customized put'''
    def put(self, *args, **kwargs):
        try:
            super().put(*args, **kwargs)
        except Full:
            self.queue.popleft()
            super().put(*args, **kwargs)

根据@Eric Smith的答案,似乎可以工作。我使用了q = QueueLatest(1),看起来还不错。不太确定它有多健壮或是否存在任何竞争条件等。


与Eric Smith的答案相同的评论。这使用底层的self.queue对象,没有同步。 - spectras

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接