Pandas和多进程内存管理:将DataFrame分成多个块

8
我需要按行处理一个巨大的pandas.DataFrame(几十GB),每一行操作都非常耗时(数十毫秒)。所以我想把框架分成块,使用multiprocessing并行处理每个块。这确实加快了任务的速度,但内存消耗是一场噩梦。
尽管每个子进程原则上只应该消耗少量数据,但它需要(几乎)与包含原始DataFrame的父进程一样多的内存。即使在父进程中删除已使用的部分也没有帮助。
我写了一个最小示例来复制这种行为。它唯一做的就是创建一个具有随机数字的大型DataFrame,将其分成最多100行的小片段,并在mp.Pool(这里是大小为4)中通过一些信息简单地打印有关DataFrame的内容。
在并行执行的主函数:
def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))

生成器助手可以将 DataFrame 分割成小块:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)

主程序如下:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')

标准输出如下:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE

问题:

主进程需要大约120MB的内存。然而,池子的子进程需要相同数量的内存,尽管它们仅包含原始DataFrame的1%(大小为100的块与原始长度为10000的数据)。为什么?

我该怎么办?Python(3)是否会将整个DataFrame发送到每个子进程中,而不管我的块处理方式?这是pandas内存管理的问题,还是multiprocessing和数据pickling的问题?谢谢!



完整的脚本,供您简单复制和粘贴,以便自行尝试:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()

1
有点老了,但仍然有效:https://dev59.com/6WPVa4cB1Zd3GeqP3z-1基本上 - 你看到的 :) 可能不是“真实”的; - opalczynski
好的,谢谢,那可能就解释了^^ - SmCaterpillar
我不得不收回,如果我使用所有8个核心(在我的实际问题中,需要处理几十GB的数据,父进程需要大约22%的RAM,子进程也需要),在某些时候所有子进程都会吞噬所有内存,然后整个程序就崩溃了。如果我只使用4个核心,虽然需要两倍的时间,但成功运行且不会崩溃。所以虚拟内存确实转化为物理内存 :-( - SmCaterpillar
@SmCaterpillar 我一直在密切关注您的示例。甚至还对主要DF的使用部分进行了整理。但是在我的情况下,仅通过消除使用的行,每次只能将DF减少40行。此外,我无法像您那样利用块,因为DF必须以自定义方式分块。很高兴听取您的想法:https://stackoverflow.com/questions/62545562/multiprocessing-with-large-iterable?noredirect=1#comment110609639_62545562 - Kdog
2个回答

5

好的,我在Sebastian Opałczyński在评论中提供的提示后解决了这个问题。

问题在于子进程是从父进程派生的,因此它们都包含对原始 DataFrame 的引用。但是,在原始进程中操作帧,因此 写时复制 行为会慢慢地杀死整个进程,并在物理内存限制达到时终止进程。

有一个简单的解决方案:不使用 pool = mp.Pool(n_jobs),而是使用 multiprocessing 的新上下文特性:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

这可以确保Pool进程只是从父进程中衍生出来,而不是被fork出来。因此,它们都无法访问原始的DataFrame,并且只需要父进程内存的一小部分。
请注意,mp.get_context('spawn')仅适用于Python 3.4及更高版本。

非常有趣的问题!!这适用于Windows还是Linux或两者都适用?!我心中想到的另一种解决方案是将数据框分成块,将它们放入列表中(以供“map”使用),并在调用“multiprocessing”之前从内存中删除原始数据框。在你看来,这样行得通吗? - ℕʘʘḆḽḘ

1
更好的实现方式是使用pandas中的分块数据框实现生成器,并将其提供给"pool.imap"函数。 pd.read_csv('<filepath>.csv', chucksize=<chunksize>) https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html 好处:它不会在主进程中读取整个数据框(节省内存)。每个子进程只会指向它需要的块。--> 解决子进程内存问题。
开销:它要求您首先将数据框保存为csv文件,然后再使用pd.read_csv读取 --> I/O时间。
注意:chunksize对于pd.read_pickle或其他压缩存储的加载方法不可用。
def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel

    df_chunked = pd.read_csv('<filepath>.csv',chunksize = chunksize) # modified
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, df_chunked) # modified

    pool.close()
    pool.join()

    print('DONE')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接