多进程:如何在进程间共享一个大的只读对象?

132

使用multiprocessing创建的子进程会共享在程序之前创建的对象吗?

我的设置如下:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))
我正在将一些大型对象加载到内存中,然后创建一个工作进程池来利用该大型对象。这个大型对象是只读的,我不需要在进程之间传递它的修改。
我的问题是:这个大型对象会被加载到共享内存中吗?就像在unix/c中生成进程时那样,还是每个进程都会加载自己的大型对象?
更新:进一步澄清 - big_lookup_object 是一个共享的查找对象。我不需要将其拆分并单独处理。我需要保留单个副本。我需要拆分的工作是读取许多其他大型文件,并根据查找对象在这些大型文件中查找项目。
进一步更新:数据库是一个好的解决方案,memcached 可能更好,磁盘上的文件(shelve 或 dbm)可能甚至更好。在这个问题中,我特别感兴趣的是一个内存中的解决方案。对于最终的解决方案,我将使用 hadoop,但我也想看看是否可以有一个本地的内存版本。

你的代码将会为父进程以及每个子进程(每个进程都会导入该模块)调用 marshal.load - jfs
你是对的,已经更正了。 - Parand
如果您想要避免复制并且使用“本地内存”,那么以下链接可能会有所帮助:http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes - jfs
2
分享的进程数量(例如 fork 或 exec)是调用进程的完全副本,但在不同的内存中。为了让一个进程与另一个进程通信,需要使用 进程间通信 或 IPC,在某些共享的内存位置进行读写操作。 - ron
你可以使用functools中的partial函数,需要将big_lookup_object作为参数传递给do_some_processing函数。当你想要将lambda表达式传递给Pool.map()或普通的Python map函数时,这也非常有用。 - chess
8个回答

60

通过multiprocessing生成的子进程是否共享程序之前创建的对象?

对于Python < 3.8,不共享对于Python ≥ 3.8,共享

进程具有独立的内存空间。

解决方案1

为了最大限度地利用具有许多工作进程的大型结构,请执行此操作。

  1. Write each worker as a "filter" – reads intermediate results from stdin, does work, writes intermediate results on stdout.

  2. Connect all the workers as a pipeline:

    process1 <source | process2 | process3 | ... | processn >result
    

每个进程都会读取、执行任务并写入结果。

这种方式非常高效,因为所有进程都在并发运行。读取和写入操作直接通过进程间共享的缓冲区进行。


解决方案2

在某些情况下,您可能会有一个更复杂的结构 - 通常是一个扇出结构。 在这种情况下,您有一个具有多个子节点的父节点。

  1. 父进程打开源数据。 父进程分叉出多个子进程。

  2. 父进程读取源代码,将源代码的部分分配给每个并发运行的子进程。

  3. 当父进程到达末尾时,关闭管道。 子进程获得文件末尾并正常完成。

编写子部分很容易,因为每个子进程只需读取sys.stdin

父进程需要进行一些花哨的工作来生成所有子进程并正确保留管道,但这不太难。

扇入是相反的结构。 许多独立运行的进程需要将其输入交错到一个公共进程中。 收集器不太容易编写,因为它必须从许多来源读取。

从许多命名管道中读取通常使用select模块来查看哪些管道具有挂起的输入。


解决方案3

共享查找是数据库的定义。

解决方案3A - 加载数据库。让工作人员处理数据库中的数据。

解决方案3B - 使用werkzeug(或类似工具)创建一个非常简单的服务器,提供响应HTTP GET的WSGI应用程序,以便工作人员可以查询服务器。


解决方案4

共享文件系统对象。Unix操作系统提供了共享内存对象。这些只是映射到内存的文件,因此进行交换I/O而不是更常规的缓冲读取。

您可以以多种方式从Python上下文中执行此操作:

  1. 编写启动程序,(1)将原始巨大对象拆分为较小的对象,(2)启动工作进程,每个进程使用较小的对象。较小的对象可以是pickled Python对象,以节省一点文件读取时间。

  2. 编写启动程序,(1)读取原始巨大对象并使用seek操作编写页面结构化的字节编码文件,以确保可以轻松查找单个部分。这就是数据库引擎所做的——将数据分成页面,使每个页面易于通过seek定位。

生成具有访问此大型页面结构化文件的工作进程。每个工作进程都可以寻找相关部分并在那里完成其工作。


我的进程并不是真正的过滤器;它们都是相同的,只是处理不同的数据片段。 - Parand
它们通常可以被构建为过滤器。它们读取其数据片段,执行工作,然后将结果写入以供稍后处理。 - S.Lott
我喜欢你的解决方案,但阻塞 I/O 怎么办?如果父进程阻塞读/写其中一个子进程,会发生什么?Select 通知你可以写入,但它不说有多少。读取也是一样。 - Cristian Ciupitu
我可以验证S.Lott所说的话。我需要在单个文件上执行相同的操作。因此,第一个工作进程对每一行进行运算(其中行号% 2 == 0),并将其保存到文件中,然后将其他行发送到下一个管道进程(这是相同的脚本)。运行时降低了一半。这有点hacky,但是开销比multiprocessing模块中的map/poop轻得多。 - Vince
@Vince:这种扩展是有限制的,但你可以尝试将文件分成8份,看看时间是否会缩短为1/8。通常情况下,确实会这样。 - S.Lott
显示剩余2条评论

40

多进程通过multiprocessing模块生成的子进程是否共享在程序中先前创建的对象?

这要看情况而定。对于全局只读变量(除了占用内存),通常可以认为是这样的,否则不应该。

multiprocessing的文档说:

继承比pickle / unpickle好

在Windows上,许多来自multiprocessing的类型需要可pickle化,以便子进程可以使用它们。但是,一般应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,使需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

显式地将资源传递给子进程

在Unix上,子进程可以利用在父进程中创建的共享资源使用全局资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(潜在)与Windows兼容外,这还确保只要子进程仍然存在,该对象就不会在父进程中被垃圾回收。如果在父进程中垃圾回收对象时释放某些资源,则这可能很重要。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有的话)可能与在调用Process.start()时在父进程中的值不同。

示例

在Windows上(单CPU):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

使用 sleep 命令:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有 sleep

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

7
哎? z 是如何在进程间共享的? - cbare
10
@cbare: 很好的问题!实际上,像使用 sleep 命令时所显示的那样,变量 z 是没有被共享的。当没有使用 sleep 命令时,输出表明 单个 进程(PID = 1148)处理所有的工作;我们在最后一个例子中看到的是这个单个进程的变量 z 的值。 - Eric O. Lebigot
1
这个答案表明z没有被共享。因此,它回答了这个问题:"在Windows下,父变量至少不会在子进程之间共享"。 - Eric O. Lebigot
@EOL:从技术上讲,你是正确的,但实际上,如果数据是只读的(不像“z”案例),它可以被视为共享的。 - jfs
只是想澄清一下,2.7文档中的语句“请记住,如果运行在子进程中的代码尝试访问全局变量...”是指在Windows下运行Python。 - user1071847

34

S.Lott 是正确的。Python 的多进程快捷方式实际上为您提供了一个单独的、重复的内存块。

在大多数 *nix 系统上,使用更低级别的 os.fork() 调用实际上会给你复制时写入内存的内存,这可能是你想要的。据我所知,在最简单的理论程序中,您可以在没有复制它的情况下读取这些数据。

然而,在 Python 解释器中,事情并不那么简单。对象数据和元数据存储在同一内存段中,因此即使对象从未更改,增加该对象的引用计数器也将导致内存写入,因此会进行复制。几乎任何做更多工作的 Python 程序都会导致引用计数增加,因此您很可能永远无法意识到写时复制的好处。

即使有人成功地在 Python 中实现了共享内存解决方案,尝试跨进程协调垃圾回收也可能会非常困难。


7
在这种情况下,只有引用计数的内存区域会被复制,而不一定是大的只读数据,对吗? - kawing-chiu

8
如果你在Unix下运行,由于fork如何工作(即子进程具有单独的内存但它是写时复制的,因此只要没有人修改它,它可能会被共享),它们可能共享相同的对象。我尝试了以下操作:
import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

并且得到了以下输出:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

当然,这并不能证明没有复制,但您可以通过查看每个子进程使用的实际内存量来验证您的情况的ps的输出。


2
垃圾收集器怎么样?它运行时会发生什么?内存布局不会改变吗? - Cristian Ciupitu
这是一个合理的担忧。它是否会影响Parand取决于他如何使用所有这些内容以及这段代码需要多可靠。如果对他不起作用,我建议使用mmap模块来获得更多控制(假设他想坚持这种基本方法)。 - Jacob Gabrielson
我已经在你的示例中发布了一个更新:https://dev59.com/sXRB5IYBdhLWcg3wUVxd#660468 - jfs
@JacobGabrielson:复制已完成。原始问题是关于复制是否已完成的。 - abhinavkulkarni

3

3
我认为对于这个问题,IPC并不合适;因为这是每个人都需要访问的只读数据。在进程之间传递它没有任何意义;最坏的情况下,每个进程都可以读取自己的副本。我试图通过不在每个进程中单独保存一份副本来节省内存。 - Parand
你可以有一个主进程将数据的片段委派给其他从进程处理。从进程可以请求数据或推送数据。这样,不是每个进程都拥有整个对象的副本。 - Vasil
1
@Vasil:如果每个进程都需要整个数据集,并且只是在其上运行不同的操作,该怎么办? - Will
我必须补充说明,大量数据传输可能会影响并行性能。即使不是 OP 问题的一部分,对于易变性工作来说,同样的观点也是有效的,其中不需要原子性。 - RomuloPBenedetti

1
不行,但是你可以将数据作为子进程加载,并允许其与其他子进程共享数据。请参见以下内容。
import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    

1

虽然与多进程本身没有直接关系,但从您的示例中,似乎您可以使用shelve模块或类似的东西。 "big_lookup_object"是否真的必须完全在内存中?


好的观点,我还没有直接比较过磁盘存储和内存存储的性能。我一直以为会有很大的差异,但实际上我还没有测试过。 - Parand

-3

对于 Linux/Unix/MacOS 平台,forkmap 是一个快速而不精致的解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接