如何在Python中实现一个多进程优先队列?

6

你知道如何在Python中实现一个多进程优先队列吗?


对于简洁的非分布式实现(使用像Alex Martelli所讲的RPC一样),请参见https://dev59.com/6V8e5IYBdhLWcg3w0dJJ#25328987。 - Ron Kaminsky
6个回答

6
哎呀,这并不像简单地更改好老的Queue.Queue中队列的调度那样简单:实际上,后者是根据模板方法模式设计的,可轻松覆盖挂钩方法_put和/或_get以更改队列调度(2.6提供了显式的LIFO和优先级实现,但即使在早期版本的Python中也很容易做到)。
对于多进程情况(多个读者,多个写入者),我看不到除放弃队列的分布式本质之外如何实现优先级队列的解决方案。指定一个特殊的辅助进程来处理队列,向它发送(基本上是)RPC来创建具有指定调度的队列,在其中进行放置和获取,并获取有关它的信息等。所以每个进程都知道辅助进程的位置(主机和端口,例如)等通常问题(如果进程始终由主进程在启动时生成,则更容易)。一个相当大的问题,特别是如果想要通过良好的性能来解决它,需要保护辅助进程的崩溃(需要将数据复制到从属进程中,如果主进程崩溃,则在从属进程中进行分布式“主选举”等),等等。从头开始做听起来就像是一个博士学位的工作。可以从Johnson的工作开始,或者依附于一些非常通用的方法,比如ActiveMQ
某些特殊情况(例如单个读者,单个写入者)可能更容易,并且对于其有限的应用领域来说更快;但是,对于该有限区域,应制定一个非常具体的规范--结果将不构成(通用)“多进程队列”,而仅适用于给定的约束要求集。

"Johnson's work"链接已损坏... - Radio Controlled

2
有一个错误导致真正的FIFO无法实现。
请阅读这里
构建优先级队列多进程设置的另一种替代方法肯定会很棒!

是的,但有没有一个好的方法来实现优先队列?这可行吗?(除非我必须专门编写一个进程来管理,但我认为这将更加困难和容易出错) - phroxy
我还没有找到方法,不过我会随时通知你的。 - Kevin Boyd

1

我有同样的使用情况,但是优先级数量是有限的。

我最终做的是为每个优先级创建一个队列,我的进程工作者将尝试从这些队列中获取项目,从最重要的队列开始到次重要的队列(当队列为空时,从一个队列移动到另一个队列)。


这似乎是页面上最合理可行的答案。 - speedplane
3
想了想,这并不容易完全做到。例如,当最高优先级队列为空且你前往下一个队列时,如何避免在检查下一个最高队列时最高优先级队列被填满而出现竞争条件? - speedplane

1

虽然这不是一个答案,但也许它可以帮助您开发一个多进程队列。

这是我使用Python的Array编写的一个简单的优先级队列类:

class PriorityQueue():
    """A basic priority queue that dequeues items with the smallest priority number."""
    def __init__(self):
        """Initializes the queue with no items in it."""
        self.array = []
        self.count = 0

    def enqueue(self, item, priority):
        """Adds an item to the queue."""
        self.array.append([item, priority])
        self.count += 1

    def dequeue(self):
        """Removes the highest priority item (smallest priority number) from the queue."""
        max = -1
        dq = 0
        if(self.count > 0):
            self.count -= 1

            for i in range(len(self.array)):
                if self.array[i][1] != None and self.array[i][1] > max:
                    max = self.array[i][1]

            if max == -1:
                return self.array.pop(0)
            else:
                for i in range(len(self.array)):
                    if self.array[i][1] != None and self.array[i][1] <= max:
                        max = self.array[i][1]
                        dq = i
                return self.array.pop(dq)

    def requeue(self, item, newPrio):
        """Changes specified item's priority."""
        for i in range(len(self.array)):
            if self.array[i][0] == item:
                self.array[i][1] = newPrio
                break

    def returnArray(self):
        """Returns array representation of the queue."""
        return self.array

    def __len__(self):
        """Returnes the length of the queue."""
        return self.count

0
根据您的需求,您可以以多种方式使用操作系统和文件系统。队列会增长到多大?它需要有多快的速度?如果队列可能很大,但您愿意为每个队列访问打开几个文件,则可以使用BTree实现来存储队列,并使用文件锁定来强制执行独占访问。虽然速度较慢,但非常稳健。
如果队列将保持相对较小并且需要快速访问,则可能可以在某些操作系统上使用共享内存...
如果队列很小(1000个条目)并且不需要非常快速,则可以使用简单的包含数据的文件目录及文件锁定。如果小而慢没问题的话,这将是我的首选。
如果队列可能很大并且您希望平均速度很快,则可能应该像Alex建议的那样使用专用服务器进程。但这很麻烦。
您的性能和大小要求是什么?

0
受@user211505建议的启发,我快速地组合了一些东西。
请注意,这不是解决多进程生产环境中优先队列困难问题的完整方案。然而,如果你只是在玩弄或需要一个短期项目的东西,这可能会符合要求:
from time import sleep
from datetime import datetime
from Queue import Empty
from multiprocessing import Queue as ProcessQueue

class SimplePriorityQueue(object):
    '''
    Simple priority queue that works with multiprocessing. Only a finite number 
    of priorities are allowed. Adding many priorities slow things down. 

    Also: no guarantee that this will pull the highest priority item 
    out of the queue if many items are being added and removed. Race conditions
    exist where you may not get the highest priority queue item.  However, if 
    you tend to keep your queues not empty, this will be relatively rare.
    '''
    def __init__(self, num_priorities=1, default_sleep=.2):
        self.queues = []
        self.default_sleep = default_sleep
        for i in range(0, num_priorities):
            self.queues.append(ProcessQueue())

    def __repr__(self):
        return "<Queue with %d priorities, sizes: %s>"%(len(self.queues), 
                    ", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()), 
                                enumerate(self.queues))))

    qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues))

    def get(self, block=True, timeout=None):
        start = datetime.utcnow()
        while True:
            for q in self.queues:
                try:
                    return q.get(block=False)
                except Empty:
                    pass
            if not block:
                raise Empty
            if timeout and (datetime.utcnow()-start).total_seconds > timeout:
                raise Empty

            if timeout:
                time_left = (datetime.utcnow()-start).total_seconds - timeout
                sleep(time_left/4)
            else:
                sleep(self.default_sleep)

    get_nowait = lambda(self): self.get(block=False)

    def put(self, priority, obj, block=False, timeout=None):
        if priority < 0 or priority >= len(self.queues):
            raise Exception("Priority %d out of range."%priority)
        # Block and timeout don't mean much here because we never set maxsize
        return self.queues[priority].put(obj, block=block, timeout=timeout)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接