Python:预加载内存

25

我有一个Python程序,需要加载和反序列化一个1GB的pickle文件。它需要约20秒时间,我希望有一种机制可以使pickle的内容随时可用。我查看了shared_memory,但其所有示例都涉及numpy,而我的项目没有使用numpy。使用shared_memory或其他方式,最简单、最干净的实现方法是什么?

这是我现在加载数据的方法(每次运行):

def load_pickle(pickle_name):
    return pickle.load(open(DATA_ROOT + pickle_name, 'rb'))

我希望能够在运行期间编辑模拟代码,而不必重新加载pickle文件。我一直在试验importlib.reload,但对于包含多个文件的大型Python程序来说似乎并不奏效:

def main():
    data_manager.load_data()
    run_simulation()
    while True:
        try:
            importlib.reload(simulation)
            run_simulation()
        except:
        print(traceback.format_exc())
        print('Press enter to re-run main.py, CTRL-C to exit')
        sys.stdin.readline()

什么是数据?你需要一次性加载所有数据吗? - mkst
1
看起来 shared_memory 将信息存储为字节缓冲区。如果您不想共享数组,那么您可能需要重新序列化数据以保存在其中。 - thshea
4
我不理解你试图解决的问题是什么。如果数据需要“随时可用”,那么为什么首先要将其 pickle 化,而不是直接保留对象?为什么程序要重新启动,特别是如果需要避免加载时间的话? - Karl Knechtel
1
有什么阻止你拥有一个主程序并将模拟重新格式化为可导入的类吗?然后让主程序一直运行(并在启动时启动),加载数据,并且每当您想要模拟时,*重新导入新的模拟类(如果可能),复制数据并传递它。 - thshea
2
你说你的代码不使用 numpy,但它使用了什么呢?你需要在运行之间保存什么庞大的数据结构?你不能将整个 Python 对象保存到某种共享内存空间中,如果你尝试这样做,会严重破坏解释器的内存管理。但是根据你的数据实际情况,你可能能够共享一些东西,我们只有在了解数据的情况下才能知道它是什么。 - Blckknght
显示剩余18条评论
9个回答

7
这可能是一个XY问题,其源头在于假设您必须使用pickle;因为它们处理依赖项的方式非常糟糕,从根本上讲长期存储数据时并不是一个好选择。
源金融数据几乎肯定以某种表格形式开始,因此有可能请求以更友好的格式呈现。
同时,简单的中间件可以反序列化和重新序列化pickle,从而平滑过渡。
input -> load pickle -> write -> output

将工作流转换为使用Parquet或Feather,这些格式被设计为读写高效,几乎肯定会大大提高加载速度。

更多相关链接:


您也可以使用hickle来实现此目的,它将在内部使用HDH5格式,理想情况下比pickle快得多,同时仍然像pickle一样运行


我不知道为什么,但是hickle并不能完全替代pickle - 我不得不重写代码 - 然后它变得非常慢。 - etayluz
绝对不是插拔式的,但这样的解决方案可以缓解政治问题,因为它很容易比较。 - ti7

6

将未pickle化的数据存储在内存中的替代方法是将pickle存储在ramdisk中,只要大部分时间开销来自磁盘读取。以下是示例代码(在终端中运行)。

sudo mkdir mnt/pickle
mount -o size=1536M -t tmpfs none /mnt/pickle
cp path/to/pickle.pkl mnt/pickle/pickle.pkl 

然后,您可以访问mnt/pickle/pickle.pkl中的pickle文件。注意,您可以将文件名和扩展名更改为任何内容。如果磁盘读取不是最大的瓶颈,可能不会看到速度提升。如果您的内存不足,可以尝试降低ramdisk的大小(我将其设置为1536 mb或1.5 GB)。


请注意,以下内容仅适用于Linux(特别是Ubuntu;我不确定它是否适用于其他系统)。如果您使用的是Windows或Mac,则需要遵循不同的过程。 - thshea
这看起来很有趣 - 但我的程序也需要在Windows上运行。我需要一个跨平台的解决方案。 - etayluz

3
你可以使用可共享的列表: 因此,您将运行一个Python程序,该程序将加载文件并将其保存在内存中,另一个Python程序可以从内存中获取文件。您的数据,无论是什么,都可以将其加载到字典中,然后转储为JSON,再重新加载JSON。 所以

程序1

import pickle
import json
from multiprocessing.managers import SharedMemoryManager
YOUR_DATA=pickle.load(open(DATA_ROOT + pickle_name, 'rb'))
data_dict={'DATA':YOUR_DATA}
data_dict_json=json.dumps(data_dict)
smm = SharedMemoryManager()
smm.start() 
sl = smm.ShareableList(['alpha','beta',data_dict_json])
print (sl)
#smm.shutdown() commenting shutdown now but you will need to do it eventually


输出结果会像这样。
#OUTPUT
>>>ShareableList(['alpha', 'beta', "your data in json format"], name='psm_12abcd')

现在在Program2中:

from multiprocessing import shared_memory
load_from_mem=shared_memory.ShareableList(name='psm_12abcd')
load_from_mem[1]
#OUTPUT
'beta'
load_from_mem[2]
#OUTPUT
yourdataindictionaryformat



你可以在这里查找更多信息: https://docs.python.org/3/library/multiprocessing.shared_memory.html

1
你确定这个可扩展性吗?我期望Manger代码将会pickle并通过IPC发送相同的数据,以便提供给问题提出者高效地使用,因此在一个程序中预加载它可能没有任何意义。 - Blckknght
1
它被预装在内存中。问答者目前每次运行程序都必须从磁盘加载数据。采用这种方法,数据将被加载到内存中,并为另一个程序提供引用以加载该数据。他需要一些可以从内存中获取文件的东西。而这段代码正实现了这个目的。如果操作系统进程后还有足够的内存,它将运行1GB的数据。 - ibadia
File "/Users/etayluz/stocks/src/data_loader.py", line 19, in main sl = smm.ShareableList(['alpha', 'beta', data_dict_json]) File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py", line 1363, in ShareableList sl = shared_memory.ShareableList(sequence) File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/shared_memory.py", line 308, in __init__ assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len AssertionError - etayluz
@ibadia,你有什么想法这个错误是关于什么的吗? - etayluz

3

另一个有挑战性的假设是,可能是您读取文件的位置造成了很大的差异。

在当今系统中,1G 的数据量并不大;20 秒的加载时间仅相当于 50MB/s,这只是甚至比最慢的磁盘提供的速度还要慢。

您可能会发现,实际上您的瓶颈是慢速磁盘或某种类型的网络共享,而更换更快的存储介质或压缩数据(例如使用 gzip)对读写操作产生很大影响。


感谢您的评论。我正在2018年款MacBook Pro上本地运行,这里没有出现任何问题。 - etayluz

2

我的理解是:

  • 需要加载某些东西
  • 这些东西经常需要被加载,因为使用这些东西的代码文件经常被编辑
  • 你不想每次都等待它被加载

也许以下解决方案适合您。

您可以按照以下方式编写脚本加载程序(已在Python 3.8上测试):

import importlib.util, traceback, sys, gc

# Example data
import pickle
something = pickle.loads(pickle.dumps([123]))

if __name__ == '__main__':
    try:
        mod_path = sys.argv[1]
    except IndexError:
        print('Usage: python3', sys.argv[0], 'PATH_TO_SCRIPT')
        exit(1)

    modules_before = list(sys.modules.keys())
    argv = sys.argv[1:]
    while True:
        MOD_NAME = '__main__'
        spec = importlib.util.spec_from_file_location(MOD_NAME, mod_path)
        mod = importlib.util.module_from_spec(spec)

        # Change to needed global name in the target module
        mod.something = something
        
        sys.modules[MOD_NAME] = mod
        sys.argv = argv
        try:
            spec.loader.exec_module(mod)
        except:
            traceback.print_exc()
        del mod, spec
        modules_after = list(sys.modules.keys())
        for k in modules_after:
            if k not in modules_before:
                del sys.modules[k]
        gc.collect()
        print('Press enter to re-run, CTRL-C to exit')
        sys.stdin.readline()

模块示例:

# Change 1 to some different number when first script is running and press enter
something[0] += 1 
print(something)

应该可以工作。并且应该将pickle重新加载的时间减少到接近零

更新 添加了通过命令行参数接受脚本名称的可能性


这个想法很好,但在实践中似乎不起作用。除非我退出并重新启动,否则我对程序中任何文件所做的更改都不会反映出来。 - etayluz
@etayluz 奇怪。你能否提供一些代码示例或其他内容,以展示它在哪方面出现问题?我不确定是否理解了,在这种情况下,该脚本应该如何失败,因为它应该从字面上卸载所有已加载的模块。或者它会崩溃?或者它会重新运行相同的代码?总之,请提供更多信息。 - CPPCPPCPPCPPCPPCPPCPPCPPCPPCPP
@etayluz 加了 gc.collect() 以防万一。不确定是否会有所改变。我已经没有任何想法来修复我从未见过的问题 :D - CPPCPPCPPCPPCPPCPPCPPCPPCPPCPP
  • 增加了在脚本执行时使用ctrl+c的可能性。因此,需要双击ctrl+c才能停止执行。
- CPPCPPCPPCPPCPPCPPCPPCPPCPPCPP
你的代码对于一个文件看起来很棒:mod_name, mod_path = 'some_file', 'some_file.py' - 但是我的程序有大约50个文件。我该如何重新加载每个文件? - etayluz
显示剩余4条评论

2

在翻译这篇答案时,以下是我做出的假设:

  1. 您的财务数据是在进行复杂操作后产生的,并且希望结果能够持久存在内存中
  2. 消费代码必须能够快速访问该数据
  3. 您希望使用共享内存

以下是代码(我认为是不言自明的)

数据结构

'''
Nested class definitions to simulate complex data
'''

class A:
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def get_attr(self):
        return self.name, self.value

    def set_attr(self, n, v):
        self.name = n
        self.value = v


class B(A):
    def __init__(self, name, value, status):
        super(B, self).__init__(name, value)
        self.status = status

    def set_attr(self, n, v, s):
        A.set_attr(self, n,v)
        self.status = s

    def get_attr(self):
        print('\nName : {}\nValue : {}\nStatus : {}'.format(self.name, self.value, self.status))

Producer.py

from multiprocessing import shared_memory as sm
import time
import pickle as pkl
import pickletools as ptool
import sys
from class_defs import B


def main():

    # Data Creation/Processing
    obj1 = B('Sam Reagon', '2703', 'Active')
    #print(sys.getsizeof(obj1))
    obj1.set_attr('Ronald Reagon', '1023', 'INACTIVE')
    obj1.get_attr()

    ###### real deal #########

    # Create pickle string
    byte_str = pkl.dumps(obj=obj1, protocol=pkl.HIGHEST_PROTOCOL, buffer_callback=None)
    
    # compress the pickle
    #byte_str_opt = ptool.optimize(byte_str)
    byte_str_opt = bytearray(byte_str)
    
    # place data on shared memory buffer
    shm_a = sm.SharedMemory(name='datashare', create=True, size=len(byte_str_opt))#sys.getsizeof(obj1))
    buffer = shm_a.buf
    buffer[:] = byte_str_opt[:]

    #print(shm_a.name)               # the string to access the shared memory
    #print(len(shm_a.buf[:]))

    # Just an infinite loop to keep the producer running, like a server
    #   a better approach would be to explore use of shared memory manager
    while(True):
        time.sleep(60)


if __name__ == '__main__':
    main()

Consumer.py

from multiprocessing import shared_memory as sm
import pickle as pkl
from class_defs import B    # we need this so that while unpickling, the object structure is understood


def main():
    shm_b = sm.SharedMemory(name='datashare')
    byte_str = bytes(shm_b.buf[:])              # convert the shared_memory buffer to a bytes array

    obj = pkl.loads(data=byte_str)              # un-pickle the bytes array (as a data source)

    print(obj.name, obj.value, obj.status)      # get the values of the object attributes


if __name__ == '__main__':
    main()

当在一个终端中执行Producer.py时,它会发出一个字符串标识符(比如wnsm_86cd09d4)用于共享内存。将此字符串输入到Consumer.py中,并在另一个终端中执行它。
只需在同一台机器上的一个终端中运行Producer.py,在另一个终端中运行Consumer.py即可。
希望这是您想要的!

你知道上面的错误是什么吗?有关于“ptool”的一些东西。 - etayluz
谢谢@anurag - 那么上面的创建错误怎么解决? - etayluz
@etayluz,我非常确定你混淆了**@ibadia**的答案和我的答案。smm变量来自他的答案-他正在使用SharedMemoryManager。虽然我没有尝试过,但这就是导致错误的原因。尝试完全使用我的代码看看是否有效! - anurag
你能分享一下你的代码吗?我无法重现这个问题。我已经简化了代码,并请求你从一个干净的状态尝试它! - anurag
让我们在聊天中继续这个讨论 - etayluz
显示剩余10条评论

2
您可以利用多进程来在子进程中运行模拟,并利用复制-写入(copy-on-write)的分支优势,仅在开始时解封/处理数据一次:
import multiprocessing
import pickle


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')


# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))


def _run_simulation(_):
    # Wrapper for `run_simulation` that takes one argument. The function passed
    # into `multiprocessing.Pool.map` must take one argument.
    run_simulation()


with mp.Pool() as pool:
    pool.map(_run_simulation, range(num_simulations))

如果您想对每个模拟运行进行参数化,可以按照以下方式操作:

import multiprocessing
import pickle


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')


# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))


with mp.Pool() as pool:
    simulations = ('arg for simulation run', 'arg for another simulation run')
    pool.map(run_simulation, simulations)

这样,run_simulation函数将会接收到来自simulations元组的值,这可以允许每个模拟使用不同的参数运行,甚至只是为了记录/保存目的分配每次运行一个ID编号或名称。

整个方法都依赖于fork的可用性。有关在Python的内置multiprocessing库中使用fork的更多信息,请参见有关上下文和启动方法的文档。您可能还希望考虑使用forkserver多进程上下文(使用mp = multiprocessing.get_context('fork'))出于文档中所述的原因。


如果您不想并行运行模拟,则可以调整此方法以适用于该需求。关键的是,为了只处理数据一次,您必须在处理数据的进程或其子进程之一中调用run_simulation

例如,如果您想编辑run_simulation执行的操作,然后再次运行它,您可以使用类似以下代码的方式进行:

main.py:

import multiprocessing
from multiprocessing.connection import Connection
import pickle

from data import load_data


# Load/process data in the parent process
load_data()
# Now child processes can access the data nearly instantaneously


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork') # Consider using 'forkserver' instead


# This is only ever run in child processes
def load_and_run_simulation(result_pipe: Connection) -> None:
    # Import `run_simulation` here to allow it to change between runs
    from simulation import run_simulation
    # Ensure that simulation has not been imported in the parent process, as if
    # so, it will be available in the child process just like the data!
    try:
        run_simulation()
    except Exception as ex:
        # Send the exception to the parent process
        result_pipe.send(ex)
    else:
        # Send this because the parent is waiting for a response
        result_pipe.send(None)


def run_simulation_in_child_process() -> None:
    result_pipe_output, result_pipe_input = mp.Pipe(duplex=False)
    proc = mp.Process(
        target=load_and_run_simulation,
        args=(result_pipe_input,)
    )
    print('Starting simulation')
    proc.start()
    try:
        # The `recv` below will wait until the child process sends sometime, or
        # will raise `EOFError` if the child process crashes suddenly without
        # sending an exception (e.g. if a segfault occurs)
        result = result_pipe_output.recv()
        if isinstance(result, Exception):
            raise result # raise exceptions from the child process
        proc.join()
    except KeyboardInterrupt:
        print("Caught 'KeyboardInterrupt'; terminating simulation")
        proc.terminate()
    print('Simulation finished')


if __name__ == '__main__':
    while True:
        choice = input('\n'.join((
            'What would you like to do?',
            '1) Run simulation',
            '2) Exit\n',
        )))
        if choice.strip() == '1':
            run_simulation_in_child_process()
        elif choice.strip() == '2':
            exit()
        else:
            print(f'Invalid option: {choice!r}')

data.py:

from functools import lru_cache

# <obtain 'DATA_ROOT' and 'pickle_name' here>


@lru_cache
def load_data():
    with open(DATA_ROOT + pickle_name, 'rb') as f:
        return pickle.load(f)

simulation.py:

from data import load_data


# This call will complete almost instantaneously if `main.py` has been run
data = load_data()


def run_simulation():
    # Run the simulation using the data, which will already be loaded if this
    # is run from `main.py`.
    # Anything printed here will appear in the output of the parent process.
    # Exceptions raised here will be caught/handled by the parent process.
    ...

上述三个文件应该都在同一个目录下,和一个可以为空的__init__.py文件一起。 main.py文件可以根据您的喜好进行重命名,并且是此程序的主要入口点。您可以直接运行simulation.py,但这将导致长时间加载/处理数据,这是您最初遇到的问题。当main.py正在运行时,可以编辑simulation.py文件,因为每次从main.py运行模拟时,它会重新加载。

对于macOS用户:在macOS上分叉可能有点错误,这就是为什么Python默认使用spawn方法在macOS上进行多进程处理,但仍然支持forkforkserver。如果遇到崩溃或与多进程相关的问题,请尝试将OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES添加到您的环境中。有关更多详细信息,请参见https://dev59.com/KVUL5IYBdhLWcg3wjIpQ#52230415

@etayluz 不需要。请看我的回答底部的方法。包含“run_simulation”的文件每次都会重新导入。您可以编辑该文件,然后在提示符处输入“1”以重新运行它。如果上一次运行仍在进行中,则可以输入“ctrl+c”停止它,然后选择提示符处的“1”。 - Will Da Silva
谢谢!请看我的问题 - 我已经尝试过这种技术,但对于具有大量文件的程序来说它的效果很奇怪。一些模块会重新加载,而其他模块则不会。根据我的经验,这不是一个可靠或可扩展的技术。此时我更倾向于使用生产者->消费者共享内存范例。 - etayluz
1
我明白你现在的意思了!谢谢你澄清了这个问题。让我明天试一下(这里已经很晚了)- 然后再跟你联系。谢谢! - etayluz
我不知道为什么,但我的所有编辑都没有从一个运行到下一个反映出来 :( 我需要每次退出并重新启动。否则它的工作非常好。我尝试编辑了20个不同的文件 - 没有一个起作用。 - etayluz
@etayluz 我很愿意在聊天室里提供帮助:https://chat.stackoverflow.com/rooms/233862/room-for-will-da-silva-and-etayluz - Will Da Silva
显示剩余8条评论

0

这并不是对问题的确切回答,因为问题看起来需要pickle和SHM,但其他人走了弯路,所以我要分享我的一个技巧。它可能会对你有所帮助。这里有一些很好的解决方案,使用pickle和SHM。关于这个,我只能提供更多相同的东西。用稍微修改的酱汁做的同样的意大利面。

我处理你的情况时使用的两个技巧如下。

第一个是使用sqlite3而不是pickle。你甚至可以轻松地开发一个模块作为sqlite的替代品。好处是数据将使用本机Python类型插入和选择,并且您可以定义自己的转换器和适配器函数,这些函数将使用您选择的序列化方法存储复杂对象。可以是pickle或json或任何其他格式。

我所做的是通过构造函数中传递的*args和/或**kwargs定义一个类。它代表了我需要的任何对象模型,然后我从我的数据库中选择“select * from table;”中的行,并让Python在新对象初始化期间解包数据。即使是自定义的数据类型转换,加载大量数据也非常快速。sqlite会为您管理缓冲和IO等内容,并比pickle更快地执行。诀窍是尽可能快地构建和初始化您的对象。我要么子类化dict(),要么使用slots来加速这个过程。 sqlite3与Python一起提供,这也是一个优点。
我的另一种方法是使用ZIP文件和struct模块。您可以构建一个包含多个文件的ZIP文件。例如,对于一个拥有超过400,000个单词的发音字典,我想要一个dict()对象。因此,我使用一个文件,比如说lengths.dat,在其中以二进制格式定义每个键值对的键长度和值长度。然后,我有一个单词文件和一个发音文件,它们一个接一个地排列在一起。当我从文件中加载时,我读取长度并使用它们从另外两个文件构建一个带有其发音的单词dict()。索引bytes()很快,因此创建这样的字典非常快。如果磁盘空间是一个问题,您甚至可以将其压缩,但这会引入一些速度损失。
这两种方法都比pickle占用更少的磁盘空间。第二种方法需要您将所需的所有数据读入RAM,然后构建对象,这将占用几乎是数据占用的两倍的RAM,然后您可以丢弃原始数据。但总体而言,不应该需要比pickle占用更多的内存。至于RAM,如果需要,操作系统将使用虚拟内存/SWAP来管理几乎任何内容。

哦,是的,我使用的第三个技巧。当我有像上面提到的构建为ZIP文件或其他需要在构建对象时进行附加反序列化的东西,并且此类对象的数量很大时,我引入了延迟加载。也就是说,假设我们有一个包含序列化对象的大文件。您使程序加载所有数据并将其分发到列表()或dict()中保留的每个对象中。 您以这样的方式编写类,即当首次请求数据时,它会解压缩其原始数据,进行反序列化等操作,然后从RAM中删除原始数据,然后返回结果。因此,直到您实际需要所需数据,您不会失去加载时间,对于用户而言,这比需要20秒才能启动进程要不可见得多。


无意冒犯,但我认为OP会更喜欢代码而不是散文! - anurag

0

我实现了python-preloaded脚本,可以帮助您在这里。它将在加载一些模块后的早期阶段存储CPython状态,然后当您需要时,您可以从此状态恢复并加载您的普通Python脚本。存储当前意味着它会留在内存中,恢复则意味着会对其进行fork,非常快速。但这些是python-preloaded的实现细节,不应该影响您。

所以,要使它适用于您的用例:

  • 创建一个新的模块,例如data_preloaded.py,并在其中加上以下代码:

    preloaded_data = load_pickle(...)
    
  • 现在运行py-preloaded-bundle-fork-server.py data_preloaded -o python-data-preloaded.bin。这将创建python-data-preloaded.bin文件,可用作python的替代品。

  • 假设您已经先前启动了python your_script.py。那么现在运行./python-data-preloaded.bin your_script.py,或者只运行python-data-preloaded.bin(没有参数)。第一次运行会很慢,大约需要20秒钟。但现在它已经在内存中了。

  • 现在再次运行./python-data-preloaded.bin your_script.py。现在应该非常快,即几毫秒。而且您可以反复启动它,每次都会非常快,直到重新启动计算机为止。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接