多进程共享不可序列化对象的方法

19

有三个可能是重复的问题(但过于具体):

回答这个问题可以回答其他三个问题。希望我表述清楚:

一旦我在由多进程创建的某个进程中创建了一个对象:

  1. 我如何将对该对象的引用传递给另一个进程?
  2. (不太重要)当我持有一个引用时,如何确保该进程不会死亡?

示例1(已解决)

from concurrent.futures import *

def f(v):
    return lambda: v * v

if __name__ == '__main__':
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
        l = list(e.map(f, [1,2,3,4]))
    print([g() for g in l]) # [1, 4, 9, 16]

示例2

假设f返回一个具有可变状态的对象。应该可以从其他进程访问此相同的对象。

示例3

我有一个具有打开文件和锁定的对象-如何授权其他进程访问?

提醒

我不希望出现特定错误,也不需要针对此特定用例的解决方案。解决方案应足够通用,以便在进程之间共享不可移动的对象。这些对象可能在任何进程中创建。也可以是使所有对象可移动并保留其标识的解决方案。

欢迎任何提示,任何部分解决方案或指向如何实现解决方案的代码片段都有价值。因此,我们可以一起创建解决方案。

这里是一个尝试解决这个问题,但没有使用多处理:https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

你想让其他进程如何处理引用?

可以将引用传递给使用multiprocessing(重复3次)创建的任何其他进程。可以访问属性,调用引用。访问的属性可能是代理,也可能不是。

仅使用代理有什么问题?

也许没有问题,但有挑战。我的印象是代理有一个管理器,而管理器有自己的进程,因此无法序列化并传输不可序列化的对象(使用StacklessPython / fork部分解决)。此外,存在特殊对象的代理-难以但不是不可能为所有对象构建代理(可解决)。

解决方案?-代理+管理器?

Eric Urban表明序列化不是问题。真正的挑战在于示例2&3:状态的同步。我的解决方案想法是为管理器创建一个特殊的代理类。这个代理类

  1. 为不可序列化的对象提供构造函数
  2. 获取可序列化对象并将其传输到管理器进程。
  3. (问题)根据1,不可序列化的对象必须在管理器进程中创建。

1
问题应该进行编辑,以解释您希望其他进程如何处理这些引用。只是将它们传回原始进程吗? - Armin Rigo
我修改了问题。谢谢你的回答,非常有见地。 - User
你是正确的,带有代理的对象存在于自己的进程中。但不是将其传输到该进程,而是首先在那里创建它。特殊对象的代理可以很容易地制作。您只需创建multiprocessing.manager.BaseManager的子类,然后调用 YourManager.register(YourClass)。这对于每个类都应该有效。而且它确实不会妨碍你,因为通常你知道哪些对象必须在进程之间共享,哪些不必共享。 - Kritzefitz
1
所以,我在上一篇帖子中想要表达的是,我没有看到任何一个例子能够证明将对象传递给管理器比在管理器中创建对象更好。 - Kritzefitz
那么,这完全回答了你的问题,还是有什么不清楚的地方吗? - Kritzefitz
显示剩余4条评论
3个回答

12

大多数情况下,将现有对象的引用传递给另一个进程并不是一个理想的选择。相反,你应该创建你想要在进程之间共享的类:

class MySharedClass:
    # stuff...

那么您可以像这样创建一个代理管理器:

import multiprocessing.managers as m
class MyManager(m.BaseManager):
    pass # Pass is really enough. Nothing needs to be done here.

接着你可以像这样在那个管理器上注册你的类:

MyManager.register("MySharedClass", MySharedClass)

当经理实例化并启动后,你可以使用 manager.start() 创建你的类的共享实例 manager.MySharedClass。这对所有需求都应该起作用。返回的代理对象的功能与原始对象完全相同,除了一些在文档中描述的异常情况。


1
太好了!我测试过了,它运行得非常好。http://codepad.org/zW2LU6XV 仍然存在并发问题,但这些问题没关系。 - User
然而,这并没有解决问题。我已经将这段代码用作MySharedClass的模板,其中包含(模拟)数据库游标。如果我尝试在MySharedClass方法中返回它,我会得到Unserializable Message错误。 - sinwav
1
@sinwav 在我理解中,进程之间无法共享数据库游标。无论你在进程之间使用何种传输机制,在某个时刻对象都需要以某种方式进行序列化。Python使用pickling来实现这一目的。如果无法对某个对象进行pickling,那肯定有其原因。对于数据库游标而言,问题在于游标只在创建它的连接上有效,而该连接只在一个进程中打开。因此,数据库游标仅在创建它的进程中有效。这意味着无法在多个进程之间共享。 - Kritzefitz

5

在阅读本答案之前,请注意所述解决方案非常糟糕。请在答案末尾注意警告。

我找到了一种通过分享对象状态的方法。 因此,我编写了这个类,可以透明地通过所有进程共享其状态:

import multiprocessing as m
import pickle

class Store:
    pass

class Shareable:
    def __init__(self, size = 2**10):
        object.__setattr__(self, 'store', m.Array('B', size))
        o = Store() # This object will hold all shared values
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

    def __getattr__(self, name):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        return getattr(o, name)

    def __setattr__(self, name, value):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        setattr(o, name, value)
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

def store(arr, s):
    for i, ch in enumerate(s):
        arr[i] = ch

def load(arr):
    l = arr[:]
    return bytes(arr)

您可以将此类(及其子类)的实例传递到任何其他进程中,它将通过所有进程同步其状态。

以下代码已进行测试:

class Foo(Shareable):
    def __init__(self):
        super().__init__()
        self.f = 1

    def foo(self):
        self.f += 1

def f(s):
    s.f += 1

if __name__ == '__main__':
    import multiprocessing as m
    import time
    s = Foo()
    print(s.f)
    p = m.Process(target=f, args=(s,))
    p.start()
    time.sleep(1)
    print(s.f)

这个类的“魔法”在于它将所有属性存储在另一个Store类的实例中。这个类并不是非常特殊,它只是一个可以具有任意属性的类。(一个字典也可以胜任。)
然而,这个类有一些非常讨厌的怪癖。我发现了其中两个。
第一个怪癖是你必须指定Store实例最多占用多少空间。这是因为multiprocessing.Array具有静态大小。所以可以在其中pickle的对象只能和数组一样大。
第二个怪癖是你不能将这个类与ProcessPoolExecutors或简单的Pools一起使用。如果你尝试这样做,就会出现错误:
>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
...     e.submit(f, args=(s,))
... 
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

警告
您可能不应该使用这种方法,因为它会使用无法控制的大量内存,与使用代理相比过于复杂(请参见我的另一个答案),并且可能以惊人的方式崩溃。


3

只需使用无栈Python即可。您可以使用pickle序列化几乎任何东西,包括函数。在这里,我使用pickle模块对lambda进行序列化和反序列化。这类似于您在示例中尝试做的事情。

以下是Stackless Python的下载链接:http://www.stackless.com/wiki/Download

Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> f = 5
>>> g = lambda : f * f
>>> g()
25
>>> import pickle
>>> p = pickle.dumps(g)
>>> m = pickle.loads(p)
>>> m()
25
>>> 

+1 这很好,但是 1. 它是否保留了 m 是 g 的身份,2. 如果我序列化函数并在另一个进程中反序列化它,它会在原始进程中被调用吗?- 不会。但如果需要保存函数而进程关闭,则这绝对是一个不错的解决方案。 - User

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接