在Python中使用多进程共享对象数组

3

我正在尝试在Python程序中创建一组共享对象(类的实例)数组,这些对象将在不同的进程之间进行修改。为此,我使用multiprocessing如下:

import multiprocessing
import numpy as np
sh = multiprocessing.RawArray (ctypes.py_object, 10)
f = np.frombuffer(sh, dtype=object)

我遇到的错误是:

Traceback (most recent call last):
  File "<pyshell#14>", line 1, in <module>
    f = np.frombuffer(sh, dtype=object)
ValueError: cannot create an OBJECT array from memory buffer

现在我的问题是,首先,这通常是解决这个问题的正确方式吗?其次,上述代码中我的错误在哪里?提前感谢您。
2个回答

3

ctypes.py_object 表示C语言中的 PyObject *。它是指向一个结构体的指针,该结构体代表Python对象,驻留在进程的私有内存中,并包含更多的指针。另一个进程无法访问它;试图在进程之间共享指针是没有用的。

此外,还可以参考Alex Martelli的这个回答


实际上,使用Array和RawArray(在多进程中)是创建一个在内存中被多个进程访问的共享数组的一种方法。我可以为整数类型的共享数组编写以上代码,但是我在为对象数组编写时遇到了问题。例如,如果我写入:
import multiprocessing as mp import numpy as np sh = mp.RawArray(ctypes.c_int, 10) f = np.frombuffer(sh, dtype=int) f [0] = 1 sh [0] 它可以无缝运行!
- user823743
@user823743 正确,但你不能对指针做同样的事情。 - Janne Karila
你说得对;幸运的是,我找到了另一种避免多进程的方法。谢谢。 - user823743

0
您可能想使用multiprocessing.Queue,您可以在其中轻松地转储对象而不必担心类型。它也是线程安全和进程安全的。
以下是一个简单的示例,使用队列解决生产者-消费者问题(原始来源,Pizza是我添加的额外奖励)。
from multiprocessing import Process, Queue

class Pizza(object):
    def __init__(self, pizza_num):
        self.pizza_num = pizza_num
        self.num_slices = 8

sentinel = "NO PIZZA"

def producer(initial_num_pizzas, total_num_pizzas, q):
    """Cooks Pizzas to be consumed and waits for the consumer to finish eating."""
    print("Producer: I am cooking %s Pizzas and putting them on the Queue!"%(total_num_pizzas-initial_num_pizzas))
    for i in range(q.qsize(), total_num_pizzas):
        print("Producer: Behold, for I have cooked Pizza no. %s"%i)
        q.put(Pizza(i))
    q.put(sentinel)

def consumer(q):
    """Consumes some Pizza. In this case, all it does is set the number of slices to 0."""
    while True:
        pizza = q.get()
        pizza.num_slices = 0
        if pizza == sentinel:
            break
        print("Comsumer: Pizza no. %s was found! It has %s slices, yum!"%(pizza.pizza_num, pizza.num_slices))

if __name__ == '__main__':
    q = Queue()
    total_num_pizzas = 10
    initial_num_pizzas = 4
    ## Let's add some Pizzas beforehand:
    for i in range(0, initial_num_pizzas):
        q.put(Pizza(i))
    print("Main: I have precooked %s Pizzas."%q.qsize())

    producer_proc = Process(target=producer, args=(initial_num_pizzas, total_num_pizzas, q))
    consumer_proc = Process(target=consumer, args=(q,))
    producer_proc.start()
    consumer_proc.start()

    q.close()  ## Shop is closed, no more Pizzas will be added to Queue!
    q.join_thread()

    producer_proc.join()
    consumer_proc.join()

以下是一个输出示例。如果您运行它,由于并行进程的非确定性执行,生产者和消费者的打印语句可能会以不同的方式交错出现。
Main: I have precooked 4 Pizzas.
Producer: I am cooking 6 Pizzas and putting them on the Queue!
Producer: Behold, for I have cooked Pizza no. 4
Producer: Behold, for I have cooked Pizza no. 5
Producer: Behold, for I have cooked Pizza no. 6
Producer: Behold, for I have cooked Pizza no. 7
Comsumer: Pizza no. 0 was found! It has 8 slices, yum!
Comsumer: Pizza no. 1 was found! It has 8 slices, yum!
Producer: Behold, for I have cooked Pizza no. 8
Comsumer: Pizza no. 2 was found! It has 8 slices, yum!
Producer: Behold, for I have cooked Pizza no. 9
Comsumer: Pizza no. 3 was found! It has 8 slices, yum!
Comsumer: Pizza no. 4 was found! It has 8 slices, yum!
Comsumer: Pizza no. 5 was found! It has 8 slices, yum!
Comsumer: Pizza no. 6 was found! It has 8 slices, yum!
Comsumer: Pizza no. 7 was found! It has 8 slices, yum!
Comsumer: Pizza no. 8 was found! It has 8 slices, yum!
Comsumer: Pizza no. 9 was found! It has 8 slices, yum!

请注意,您应该使用哨兵标记队列的结尾。我在这里使用了“NO PIZZA”,但它们可以是任何东西。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接