如何在多进程中使用共享内存的pandas DataFrame?

6

你考虑过使用dask吗? - rpanai
感谢您的输入。我想使用“df.loc[len(df)] = [x, x]”向数据框添加新行。Dask能否轻松帮助我完成这个任务,并确保以同步方式进行? - user9098935
你的意思是保留顺序吗? - rpanai
1
你可能想看一下这个玩具示例 pastebin - rpanai
1个回答

4

如果您不想使用dask,可以通过将pandas dataframe转换为numpy数组,然后在子进程中重新构建它来使用共享内存共享。

from multiprocessing import shared_memory

def create_shared_block(to_share, dtypes):
    # float64 can't be pickled
    for col, dtype in to_share.dtypes.items():
        if dtype == 'float64':
            to_share[col] = pd.to_numeric(to_share[col], downcast='float')
            
    # make the dataframe a numpy array
    to_share.reset_index(inplace=True)
    
    # drop the index if named index
    to_share = to_share.drop('index', axis=1)
    
    # get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
    dtypes_sorted = sort_dtypes(to_share, dtypes)
    
    # get the dataframe values in the format expected by numpy
    values = [tuple(x) for x in to_share.values.tolist()]
    
    # create a numpy array
    to_share = np.array(values, dtype=(dtypes_sorted))
    
    # create a shared memory of the size of the array
    shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
    
    # now create a NumPy array backed by shared memory
    np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
    
    # Copy the original data into shared memory
    np_array[:] = to_share[:]
    return shm, np_array, dtypes_sorted


def sort_dtypes(df, dtypes):
    # category is a pandas dtype, not numpy
    string_types = ('category', 'object', '|S')
    dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
              x in df.columns]
    # build a lookup
    dtypes_dict = {x: y for x, y in dtypes}
    # fix the order
    dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
    return dtypes_sorted

# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)

# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)

# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)

在共享一个包含10万行的数据帧时,这种方法可以节省我的应用程序一些内存,但可能不及使用一些已建立库如dask所能节省的内存。 我不太确定重新创建pandas数据帧所涉及的开销 - 我认为它只是引用了共享的numpy数组,并在其上添加了一些额外的内容使其成为数据帧。


1
=Python 3.8 only
- VovaM
我正在寻找一种简单的解决方案,可以在多个进程之间通过共享内存传递一个DataFrame。我理解这里发生的事情,但是否有任何方法可以在不从NumPy数组重构DataFrame的情况下传递它?这似乎是为了一件很常见的事情而需要大量的代码。特别是如果你不知道DataFrame的布局,因此需要依赖大量的迭代和条件逻辑。 - Anthony Nash
据我所知,你应该只是使用DaskPolars,而不是试图使用共享内存创建自己的解决方案。当我写这个时,我不想学习另一个库。然而,我可能花了更多时间来弄清楚如何传递df,而不是学习Dask/Polars。 - forgetso

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接