使用SyncManager类实现Python多进程优先级队列的队列函数

3
我希望您能够实现多进程优先队列。我找到了这个答案: Python 2.7.6中使用多进程PriorityQueue的奇怪行为,作者是Dano
在我实现后,我可以使用.get()和.put()函数来操作我的优先队列,但当我使用.queue来打印当前队列中的元素时,它会给出一个错误。
代码如下:
 class MyManager(SyncManager):   
    pass

def get_manager():
    MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue
    m = MyManager()
    m.start()
    return m

m = get_manager()
call= m.PriorityQueue()
for i in range(5):
    call.put(i)

print(call.queue)

错误提示:AttributeError: 'AutoProxy[PriorityQueue]' 对象没有属性 'queue'

我看了一下 Python 的 SyncManager 文档,并修改了我的代码。

新的代码:

class MyManager(SyncManager):   
    pass

def get_manager():
    MyManager.register("PriorityQueue", PriorityQueue,exposed=['put','get','queue'])  # Register a shared PriorityQueue
    m = MyManager()
    m.start()
    return m

m = get_manager()
call= m.PriorityQueue()
for i in range(5):
    call.put(i)

print(call.queue)

现在的输出是:-
<bound method AutoProxy[PriorityQueue].queue of <AutoProxy[PriorityQueue] object, typeid 'PriorityQueue' at 0x7ff3b48f2dd0>>

我仍然无法获取队列中的元素,我阅读了有关register函数的method_to_typeid属性来映射exposed中提到的函数的返回类型的方法,但我不知道如何使用它。
有人可以帮助我吗?这样我就可以在不从队列中弹出元素的情况下打印队列的元素。

“queue” 似乎是一个方法,因此您需要调用它。尝试使用“call.queue()”(如果它需要任何参数,则可能会给您另一个错误)。 - Graipher
queue 不是可调用的。因此,我们在使用 queue 时不需要括号 (),无论是普通队列、多进程队列还是其他任何类型的队列。 - Arshad Ahmad
1个回答

4
你只能通过代理使用引用的方法。由于PriorityQueue().queue不是一个方法,而是一个实例属性,你需要提供一个可以返回该属性值的方法。 下面的示例选择了使用子类化PriorityQueue的通用get_attribute方法。
# Python 3.7.1
from queue import PriorityQueue
from multiprocessing.managers import SyncManager
from multiprocessing import Process


SENTINEL = None


class MyPriorityQueue(PriorityQueue):
    def get_attribute(self, name):
        return getattr(self, name)


class MyManager(SyncManager):
    pass


def get_manager():
    MyManager.register("PriorityQueue", MyPriorityQueue)
    m = MyManager()
    m.start()
    return m


def f(q):
    for item in iter(lambda: q.get()[1], SENTINEL):
        print(item)
    print(f'queue: {q.get_attribute("queue")}')


if __name__ == '__main__':

    m = get_manager()
    pq = m.PriorityQueue()

    tasks = enumerate([f'item_{i}' for i in range(5)] + [SENTINEL])

    for task in tasks:
        pq.put(task)

    print(f'queue: {pq.get_attribute("queue")}')
    print(f'maxsize: {pq.get_attribute("maxsize")}')

    p = Process(target=f, args=(pq,))
    p.start()
    p.join()

示例输出:
queue: [(0, 'item_0'), (1, 'item_1'), (2, 'item_2'), (3, 'item_3'), (4, 'item_4'), (5, None)]
maxsize: 0
item_0
item_1
item_2
item_3
item_4
queue: []

谢谢,运行得非常完美,也解除了我的疑虑。 - Arshad Ahmad

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接