Pandas并行groupBy消耗大量内存

5

我有一个中等大小的文件(约300MB),其中包含一份个人名单(约30万人)以及他们执行的操作。我试图使用groupByapply的并行版本来为每个个人应用一个操作,具体方法可以在这里找到。它的大致代码如下:

import pandas
import multiprocessing
from joblib import Parallel, delayed

df = pandas.read_csv(src)
patients_table_raw = apply_parallel(df.groupby('ID'), f)

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

但不幸的是,这将占用大量的空间。我认为这与以下简单命令有关:

list_groups = list(df.groupby('ID'))

消耗了几个GB的内存!该怎么办?我的初步想法是将groupBy迭代成小的“堆栈”,不会占用太多内存(但我没有找到一种方法可以在不将其强制转换为列表的情况下这样做)。

更详细的背景

我有一个简单的CSV数据集,如下所示:

|-------------------------|
| ID | Timestamp | Action |
|-------------------------|
|1   | 0         | A      |
|1   | 10        | B      |
|1   | 20        | C      |
|2   | 0         | B      |
|2   | 15        | C      |
         ...

我基本上想做的是创建一个不同的表,其中包含个人的行动/时间戳和其ID的描述。这将有助于我检索个体。

|------------------|
| ID | Description |
|------------------|
|1   | 0A10B20C    |
|2   | 0B15C       |
         ...

为了实现这个目标,并且遵循Pythonic的方式,我的想法基本上是将第一个表格加载到 Pandas DataFrame 中,按 ID 进行分组,并在分组中应用一个函数,以便为每个组(每个ID)返回我想要的表格的一行。然而,我有大量数据(约1百万个个体),而groupBy 操作非常耗时(没有明确的垃圾回收,正如我在自己的答案中提到的那样)。此外,并行化 groupBy 操作会导致显着的内存使用,因为显然有些东西被重复复制。因此,更详细的问题是:如何使用 groupBy(从而使数据处理比您自己实现的循环更快),而不会出现巨大的内存开销?

有一个替代方案!使用Apache Spark。 :D - Alberto Bonsanto
3
这句话的意思是:在处理这个大小的文件时,你是否真的需要并行处理?串行处理会更容易。如果你确实需要并行处理,可以考虑使用dask - chrisb
dask 的设计目的就是通过保持您与数据框架的工作方式来解决您的问题。无需添加任何额外的 Java Spark 魔法功能。 - Zeugma
1
我同意@chrisb的观点 - 我认为你不需要并行计算 - 使用Pandas时很多时候会更慢。你能否发布一个带有样本数据集、你要实现的简短描述和期望数据集的问题 - 在这种情况下,Stack Overflow社区可能能够找到最优解决方案。 - MaxU - stand with Ukraine
1
@ManoelRibeiro,我已经添加了一个答案——您能用您的真实数据检查一下吗? - MaxU - stand with Ukraine
显示剩余2条评论
2个回答

2

一些评论和我找到的解决方案:

  • I've tried dask and it didn't made much difference. I guess it is because the file is not big enough to use the secondary memory.

  • The memory performance improves significantly if you perform the garbage collection inside the function you apply to the groups. I've managed to do so with a simple gc.collect() that happens every $10000$ interactions. Something like:

    x['ID'].head(1).values[0] % 10000 == 0:
        gc.collect()
    
  • The garbage collection actually made my parallel version run. But the return pd.concat(retLst) was another huge bottleneck, and consumed tons of memory!

我的最终解决方案是通过外部并行化来解决:

  • I created a function that will perform the groupBy and the apply for individuals with ID's inside a range [X,Y]

  • I simply create a pool and run those in parallel. Each process saves a file with a different name, depending on its range

    f = functools.partial(make_patient_tables2, src="in", dest="out")
    range_of = [(0, 10000), (10000, 20000), (20000, 30000)]
    with Pool(cpu_count()) as p:
        ret_list = p.map(f, range_of)
    
  • Last but not least, I concatenate all the generated files.

请注意,这仍然占用了一些内存,因为我们必须复制表格的读取(这在make_patient_tables2内完成,但无论如何都会发生,因为多进程不共享资源)。因此,更好的解决方案将涉及共享资源,但垃圾收集器+不使用连接符+只复制原始数据2-3次对我来说已经足够了!相当丑陋。希望能对其他人有所帮助。

2

尝试以下代码(不使用并行化):

In [87]: df
Out[87]:
   ID  Timestamp Action
0   1          0      A
1   1         10      B
2   1         20      C
3   2          0      B
4   2         15      C

In [88]: df.set_index('ID').astype(str).sum(axis=1).groupby(level=0).sum().to_frame('Description').reset_index()
Out[88]:
   ID Description
0   1    0A10B20C
1   2       0B15C

嘿!我已经尝试过了,它很有效。奇怪的是,在你的方法中垃圾收集问题并没有发生。然而,这种方法并不完全可推广(实际上在真实数据集中,你有一个更复杂的描述和表格),因此你有点“需要”使用apply函数,因为仅仅求和是无法解决它的。我想知道是否通过应用通用函数会创建所有我遇到的问题。 - Manoel Ribeiro
1
@ManoelRibeiro,一般来说.apply()效率相对较低,所以如果可能的话我们要避免使用它。 - MaxU - stand with Ukraine

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接