我有一个中等大小的文件(约300MB),其中包含一份个人名单(约30万人)以及他们执行的操作。我试图使用groupBy
和apply
的并行版本来为每个个人应用一个操作,具体方法可以在这里找到。它的大致代码如下:
import pandas
import multiprocessing
from joblib import Parallel, delayed
df = pandas.read_csv(src)
patients_table_raw = apply_parallel(df.groupby('ID'), f)
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
但不幸的是,这将占用大量的空间。我认为这与以下简单命令有关:
list_groups = list(df.groupby('ID'))
消耗了几个GB的内存!该怎么办?我的初步想法是将groupBy迭代成小的“堆栈”,不会占用太多内存(但我没有找到一种方法可以在不将其强制转换为列表的情况下这样做)。
更详细的背景
我有一个简单的CSV数据集,如下所示:
|-------------------------|
| ID | Timestamp | Action |
|-------------------------|
|1 | 0 | A |
|1 | 10 | B |
|1 | 20 | C |
|2 | 0 | B |
|2 | 15 | C |
...
我基本上想做的是创建一个不同的表,其中包含个人的行动/时间戳和其ID的描述。这将有助于我检索个体。
|------------------|
| ID | Description |
|------------------|
|1 | 0A10B20C |
|2 | 0B15C |
...
为了实现这个目标,并且遵循Pythonic的方式,我的想法基本上是将第一个表格加载到 Pandas DataFrame 中,按 ID 进行分组,并在分组中应用一个函数,以便为每个组(每个ID)返回我想要的表格的一行。然而,我有大量数据(约1百万个个体),而groupBy 操作非常耗时(没有明确的垃圾回收,正如我在自己的答案中提到的那样)。此外,并行化 groupBy 操作会导致显着的内存使用,因为显然有些东西被重复复制。因此,更详细的问题是:如何使用 groupBy(从而使数据处理比您自己实现的循环更快),而不会出现巨大的内存开销?
dask
。 - chrisbdask
的设计目的就是通过保持您与数据框架的工作方式来解决您的问题。无需添加任何额外的 Java Spark 魔法功能。 - Zeugma