Pandas:优化我的代码(groupby() / apply())

11
我有一个形状为1.5M x 128的数据框。我做了以下操作:
  1. 基于6列进行groupby()。这样创建了大约8700个子组,每个子组的形状为538 x 122。
  2. 在每个子组上运行apply()函数。该函数计算每个分类值在每列(即122)中的百分比频率。
因此,我的(伪)代码如下: <df = 从文件中读取数据框> g = df.groupby(grp_cols) g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts()) / len(d.index)) 代码对我来说可以正常工作,现在我正在对其进行性能优化。 apply() 函数需要大约20-25分钟才能运行。我认为问题在于它对每个子组进行了122次(即每列)迭代,而不是最佳方式(考虑到我编写的方式)。
有人可以推荐我尝试加速的方法吗?
我尝试使用 Python 多进程池(8个进程)将子组划分为相等集合以进行处理,但最终出现了一些 pickling 错误...
谢谢。

首先要做的是:g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts(normalize=True))) - user4979733
1个回答

14

pd.DataFrame.groupby.apply能够为我们提供很多的灵活性(与agg/filter/transform 不同,它允许你将每个子组重塑为任何形状,在您的情况下,从 538 x 122 到 N_categories x 122)。但它确实有一个代价:逐个应用您的灵活函数并且缺乏向量化。

我仍然认为解决这个问题的方法是使用 multiprocessing。您遇到的 pickle 错误很可能是因为您在 multi_processing_function 内定义了一些函数。规则是您必须将所有函数移动到顶级。请参见以下代码:

import pandas as pd
import numpy as np

# simulate your data with int 0 - 9 for categorical values
df = pd.DataFrame(np.random.choice(np.arange(10), size=(538, 122)))
# simulate your groupby operations, not so cracy with 8700 sub-groups, just try 800 groups for illustration
sim_keys = ['ROW' + str(x) for x in np.arange(800)]
big_data = pd.concat([df] * 800, axis=0, keys=sim_keys)
big_data.shape

big_data.shape
Out[337]: (430400, 122)

# Without multiprocessing
# ===================================================
by_keys = big_data.groupby(level=0)

sample_group = list(by_keys)[0][1]
sample_group.shape

def your_func(g):
    return g.apply(lambda s: s.value_counts()) / len(g.index)

def test_no_multiprocessing(gb, apply_func):
    return gb.apply(apply_func)

%time result_no_multiprocessing = test_no_multiprocessing(by_keys, your_func)

CPU times: user 1min 26s, sys: 4.03 s, total: 1min 30s
Wall time: 1min 27

这里运行速度相对较慢,我们可以使用multiprocessing模块:

# multiprocessing for pandas dataframe apply
# ===================================================
# to void pickle error, must define functions at TOP level, if we move this function 'process' into 'test_with_multiprocessing', it raises a pickle error
def process(df):
    return df.groupby(level=0).apply(your_func)

def test_with_multiprocessing(big_data, apply_func):

    import multiprocessing as mp

    p = mp.Pool(processes=8)
    # split it into 8 chunks
    split_dfs = np.array_split(big_data, 8, axis=0)
    # define the mapping function, wrapping it to take just df as input
    # apply to each chunk
    df_pool_results = p.map(process, split_dfs)

    p.close()

    # combine together
    result = pd.concat(df_pool_results, axis=0)

    return result


%time result_with_multiprocessing = test_with_multiprocessing(big_data, your_func)

CPU times: user 984 ms, sys: 3.46 s, total: 4.44 s
Wall time: 22.3 s

现在,它的速度更快了,特别是在 CPU 时间方面。虽然在拆分和重新组合结果时会有一些额外开销,但使用 8 核处理器时,预计比非多处理情况要快约4-6倍。

最后,检查两个结果是否相同。

import pandas.util.testing as pdt

pdt.assert_frame_equal(result_no_multiprocessing, result_with_multiprocessing)

优美地通过测试。


非常感谢你,建勋。 - user4979733

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接