我可以配置Queue.Queue
吗,以便它始终接受新项目,并在队列已满时仅删除最旧的项目?
如果不行,标准库中是否有另一个队列类可以实现此功能?
(我不能使用deque,因为我有一个需要同步的生产者/消费者设置。)
在评论中提到的使用条件保护资源访问的示例。
import collections
import threading
import time
queue = collections.deque()
condition = threading.Condition()
def consumer():
condition.acquire()
while True:
while queue:
item = queue.popleft()
condition.release()
# do something with item
print(item)
condition.acquire()
condition.wait()
def push_item(item):
with condition:
queue.append(item)
condition.notify()
# From that point forward, it is just demonstration code to show how to use
def example_producer_thread(*args):
for arg in args:
push_item(arg)
consumer_thread = threading.Thread(target=consumer, name='queue consumer')
consumer_thread.daemon = True # so it does not prevent python from exiting
consumer_thread.start()
for example in [range(0, 10), range(10, 20), range(20, 30)]:
threading.Thread(target=example_producer_thread, args=example).start()
time.sleep(1) # let the consumer thread some time before the script gets killed
consumer()
是消费者线程,它保持空闲状态(不进行轮询),直到其他线程将项目放入队列中。当被唤醒时,它会锁定队列,获取一个项目,解锁队列,处理该项目,直到队列中没有更多项目。然后释放并进入睡眠状态。push_item()
将单个项目推入队列,并通知消费者线程应该唤醒。example_producer_thread
只是将其参数推入队列中。我们启动了其中的三个线程,每个线程在一段数字范围内操作,以便我们可以看到结果。maxlen
即可正常运行。在此过程中,可以封装该功能于一个小类中。更新:请不要使用此方法。正如@spectras所指出的那样,它不能正确同步。
这并不是特别优雅,但是对于我来说似乎可以用于多个写入者。
class QueueLatest(queue.Queue):
def put(self, item):
while True:
try:
super().put(item, block = False)
break
except queue.Full:
_ = self.queue.popleft()
self.queue
对象。这将根据 queue.Queue
如何包装它最终导致崩溃。 - spectras_get
是一个内部方法,不应该从外部使用(因为它假定任何同步都已经由调用者处理)。get
使用 threading.Condition
正确地进行同步。 - spectrassuper().get()
替换 self.queue.popleft()
可以解决这个问题。有什么理由不这样做吗? - NichtJensfrom queue import Queue, Full
class QueueLatest(Queue):
''' customized put'''
def put(self, *args, **kwargs):
try:
super().put(*args, **kwargs)
except Full:
self.queue.popleft()
super().put(*args, **kwargs)
根据@Eric Smith的答案,似乎可以工作。我使用了q = QueueLatest(1),看起来还不错。不太确定它有多健壮或是否存在任何竞争条件等。
self.queue
对象,没有同步。 - spectras
collections.deque
并传入maxlen
参数实现相应的功能。 - Moses Koledoyequeue.get(block=True)
。在底层,我认为这会等待条件变量。如果要在此处改用互斥锁,我将不得不使用由互斥锁保护的queue.get(block=False)
进行忙等待,这将消耗更多的 CPU。 - Alex Flint