你知道如何在Python中实现一个多进程优先队列吗?
你知道如何在Python中实现一个多进程优先队列吗?
Queue.Queue
中队列的调度那样简单:实际上,后者是根据模板方法模式设计的,可轻松覆盖挂钩方法_put
和/或_get
以更改队列调度(2.6提供了显式的LIFO和优先级实现,但即使在早期版本的Python中也很容易做到)。我有同样的使用情况,但是优先级数量是有限的。
我最终做的是为每个优先级创建一个队列,我的进程工作者将尝试从这些队列中获取项目,从最重要的队列开始到次重要的队列(当队列为空时,从一个队列移动到另一个队列)。
虽然这不是一个答案,但也许它可以帮助您开发一个多进程队列。
这是我使用Python的Array编写的一个简单的优先级队列类:
class PriorityQueue():
"""A basic priority queue that dequeues items with the smallest priority number."""
def __init__(self):
"""Initializes the queue with no items in it."""
self.array = []
self.count = 0
def enqueue(self, item, priority):
"""Adds an item to the queue."""
self.array.append([item, priority])
self.count += 1
def dequeue(self):
"""Removes the highest priority item (smallest priority number) from the queue."""
max = -1
dq = 0
if(self.count > 0):
self.count -= 1
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] > max:
max = self.array[i][1]
if max == -1:
return self.array.pop(0)
else:
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] <= max:
max = self.array[i][1]
dq = i
return self.array.pop(dq)
def requeue(self, item, newPrio):
"""Changes specified item's priority."""
for i in range(len(self.array)):
if self.array[i][0] == item:
self.array[i][1] = newPrio
break
def returnArray(self):
"""Returns array representation of the queue."""
return self.array
def __len__(self):
"""Returnes the length of the queue."""
return self.count
from time import sleep
from datetime import datetime
from Queue import Empty
from multiprocessing import Queue as ProcessQueue
class SimplePriorityQueue(object):
'''
Simple priority queue that works with multiprocessing. Only a finite number
of priorities are allowed. Adding many priorities slow things down.
Also: no guarantee that this will pull the highest priority item
out of the queue if many items are being added and removed. Race conditions
exist where you may not get the highest priority queue item. However, if
you tend to keep your queues not empty, this will be relatively rare.
'''
def __init__(self, num_priorities=1, default_sleep=.2):
self.queues = []
self.default_sleep = default_sleep
for i in range(0, num_priorities):
self.queues.append(ProcessQueue())
def __repr__(self):
return "<Queue with %d priorities, sizes: %s>"%(len(self.queues),
", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()),
enumerate(self.queues))))
qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues))
def get(self, block=True, timeout=None):
start = datetime.utcnow()
while True:
for q in self.queues:
try:
return q.get(block=False)
except Empty:
pass
if not block:
raise Empty
if timeout and (datetime.utcnow()-start).total_seconds > timeout:
raise Empty
if timeout:
time_left = (datetime.utcnow()-start).total_seconds - timeout
sleep(time_left/4)
else:
sleep(self.default_sleep)
get_nowait = lambda(self): self.get(block=False)
def put(self, priority, obj, block=False, timeout=None):
if priority < 0 or priority >= len(self.queues):
raise Exception("Priority %d out of range."%priority)
# Block and timeout don't mean much here because we never set maxsize
return self.queues[priority].put(obj, block=block, timeout=timeout)