以内存高效的方式将大型csv文件读入稀疏的pandas数据框。

33

使用Pandas中的read_csv函数似乎没有稀疏选项。我的CSV数据中有很多零值(压缩效果非常好,删除任何0值可以将其减少到原始大小的一半左右)。

我尝试先使用read_csv将其加载到密集矩阵中,然后调用to_sparse,但是这需要很长时间,并且对文本字段进行处理时会发生错误,尽管大部分数据都是浮点数。如果我首先调用pandas.get_dummies(df)将分类列转换为1和0,然后调用to_sparse(fill_value=0),它需要极长的时间,比我预期的一个具有1200万条记录且大部分为零的数值表还要长得多。即使我从原始文件中删除了零并调用to_sparse()(以使填充值为NaN),情况仍然如此。无论我是否传递kind ='block'kind ='integer',这种情况都会发生。

除了手动构建稀疏数据框之外,是否有一种良好而平滑的方法来直接加载稀疏CSV,而不会占用大量不必要的内存?


下面是一些用于创建样本数据集的代码,其中有3列浮点数据和一列文本数据。大约85%的浮点值为零,CSV的总大小约为300 MB,但您可能需要将其变得更大以真正测试内存限制。

np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)

这里有一种简单的方法来阅读它,但希望还有更好、更有效的方法:

sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)

编辑添加(来自JohnE):如果可能,请在您的答案中提供有关读取大型CSV文件时相对性能统计信息,包括有关如何测量内存效率的信息(尤其是内存效率比时钟时间更难以测量)。特别要注意,如果某个答案更加内存高效,则较慢的(时钟时间)答案可能是最佳答案。


你尝试过将字典传递给read_csv的关键字参数dtype吗?我猜如果你将列的数据类型声明为int,这将大大加快to_sparse方法的性能。在这里使用Ctrl+f查找“dtype”。 - user2734178
@user2734178 我尝试了,但很遗憾它并没有起到作用。 - Josephine Moeller
我认为最节省内存的方法是分块读取,然后将每个块转换为稀疏格式。虽然有点麻烦,但这样就永远不会在内存中拥有完整的未压缩数据集了。 - JohnE
@JohnE 是的,我恐怕只能这样做。 :-/ - Josephine Moeller
1
@JohnE 如果您能编写您的答案,那将非常棒。这似乎是最好的方法,但许多用户(包括我)不知道从哪里开始! - famargar
显示剩余5条评论
2个回答

27

我可能会通过使用dask以流式方式加载数据来解决这个问题。例如,您可以按如下方式创建一个dask数据框:

import dask.dataframe as ddf
data = ddf.read_csv('test.csv')

这个data对象实际上还没有做任何事情;它只是一种从磁盘中以可管理的块读取数据框的“配方”。如果您想要具体化数据,可以调用compute()

df = data.compute().reset_index(drop=True)

现在你有一个标准的pandas数据帧(我们称之为reset_index,因为默认情况下每个分区都是独立索引的)。结果等同于直接调用pd.read_csv时获得的结果:

在这一点上,您已经有了一个标准的 pandas 数据帧(我们称之为 reset_index,因为默认情况下每个分区都有自己的索引)。它的结果与直接调用 pd.read_csv 得到的结果相同:

df.equals(pd.read_csv('test.csv'))
# True

dask的好处在于您可以为构建数据框架添加指令;例如,您可以按如下方式使数据的每个分区变得稀疏:

data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))

此时调用compute()将构建一个稀疏数组:

df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame

性能分析

为了比较dask方法和原始的pandas方法,让我们进行一些代码行级别的性能分析。我将使用lprunmprun,如此处所述(完整披露:这是我自己书中的一个部分)。

假设你正在使用Jupyter笔记本电脑,可以按照以下方式运行它:

首先,创建一个单独的文件,其中包含我们要执行的基本任务:

%%file dask_load.py

import numpy as np
import pandas as pd
import dask.dataframe as ddf

def compare_loads():
    df = pd.read_csv('test.csv')
    df_sparse = df.to_sparse(fill_value=0)

    df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    df_dask = df_dask.compute().reset_index(drop=True)

接下来,让我们逐行对计算时间进行分析:

%load_ext line_profiler

from dask_load import compare_loads
%lprun -f compare_loads compare_loads()

我得到了以下结果:

Timer unit: 1e-06 s

Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     6                                           def compare_loads():
     7         1      4746788 4746788.0     34.1      df = pd.read_csv('test.csv')
     8         1       769303 769303.0      5.5      df_sparse = df.to_sparse(fill_value=0)
     9                                           
    10         1        33992  33992.0      0.2      df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11         1         7848   7848.0      0.1      df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12         1      8348217 8348217.0     60.0      df_dask = df_dask.compute().reset_index(drop=True)

我们可以看到,在上面的示例数组中,大约60%的时间用于dask调用,而大约40%的时间用于pandas调用。这告诉我们,对于这个任务,dask比pandas慢大约50%,这是可以预料的,因为数据分块和重组会导致一些额外的开销。

Dask在内存使用方面表现出色:让我们使用mprun进行逐行内存分析:

%load_ext memory_profiler
%mprun -f compare_loads compare_loads()

在我的电脑上的结果是这样的:

Filename: /Users/jakevdp/dask_load.py

Line #    Mem usage    Increment   Line Contents
================================================
     6     70.9 MiB     70.9 MiB   def compare_loads():
     7    691.5 MiB    620.6 MiB       df = pd.read_csv('test.csv')
     8    828.8 MiB    137.3 MiB       df_sparse = df.to_sparse(fill_value=0)
     9                             
    10    806.3 MiB    -22.5 MiB       df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11    806.4 MiB      0.1 MiB       df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12    947.9 MiB    141.5 MiB       df_dask = df_dask.compute().reset_index(drop=True)

我们可以看到最终的pandas dataframe大小约为140MB左右,但是在读取数据到临时密集对象时,pandas沿途使用了大约620MB。

另一方面,dask在加载数组和构建最终稀疏结果时仅使用了大约140MB的内存。如果您正在读取的数据的密集大小与您系统上可用的内存相当,那么dask具有明显的优势,尽管计算时间会慢约50%左右。


但是对于处理大型数据,您不应该止步于此。假设您正在对数据进行某些操作,dask dataframe抽象允许您在实例化数据之前对这些操作(即将它们添加到“配方”中)进行操作。因此,如果您正在使用数据进行算术、聚合、分组等操作,您甚至不需要担心稀疏存储:只需使用dask对象执行这些操作,最后调用compute(),dask将以内存有效的方式应用它们。

例如,我可以使用dask dataframe计算每列的max(),而无需一次性将整个数据加载到内存中:

>>> data.max().compute()
x      5.38114
y      5.33796
z      5.25661
txt          j
dtype: object

直接使用dask数据框将使您避免对数据表示的担忧,因为您可能永远不必一次性将所有数据加载到内存中。

祝好运!


非常感谢您的回答!我应该指出,我发现很难得到一致的内存定时结果。我回到了您书中相关的章节(《数据科学手册》中的“分析和计时代码”)-- 免费宣传 ;-),大部分时间都在尝试使用%memit,但是仍然无法得到真正一致的结果(我将在我的回答中详细说明)。 - JohnE
在笔记本中使用memit有点棘手 - 在对同一个函数进行第二次分析之前,您需要重新启动内核,否则会得到奇怪的结果。 - jakevdp
好的,我实际上是在qt控制台中进行操作的,如果这有影响的话。等我有足够的时间重置内核之类的时候,我会尝试玩一下这个。 - JohnE
如何在Dask的数据框中迭代行? - aviral sanjay
to_sparse 似乎已经不再是一个事情了。 - Fr4nc3sc0NL

11

这里提供了一个主要作为基准的答案。希望有比这更好的方法。

chunksize = 1000000       # perhaps try some different values here?
chunks = pd.read_csv( 'test.csv', chunksize=chunksize, dtype={'txt':'category'} )
sdf = pd.concat( [ chunk.to_sparse(fill_value=0.0) for chunk in chunks ] )

正如@acushner所指出的那样,您可以将其改写为生成器表达式:

sdf = pd.concat( chunk.to_sparse(fill_value=0.0) for chunk in chunks )

尽管在我的测试中没有看到任何显著差异,但似乎有共识认为这比列表推导式的方法更好。也许对于不同的数据,您可能会看到更大的差异。

我希望能够报告一些关于各种方法的内存分析,但很难得到一致的结果,我怀疑是因为Python总是在幕后清理内存,导致一些随机噪声被添加到结果中。(在对Jake的回答发表评论时,他建议在每个%memit之前重新启动jupyter kernel以获得更一致的结果,但我尚未尝试。)

但是我一直在使用%%memit,发现分块读取和@jakevdp的dask方法都大致上使用了OP中朴素方法的一半左右的内存。更多关于分析的内容,可以参考Jake所写的书《Python数据科学手册》中的“分析和计时代码”一章。


只是提醒一下,您可以创建一个空的 DataFrame,然后将数据附加到其中,这样可以避免同时在内存中保存所有块的问题。 - Josephine Moeller
1
@JohnMoeller 这不是一个好主意。每次你向数据框添加内容时,它都必须重新分配整个数据集以使其连续。另外,使用生成器表达式而不是列表推导式在 concat 调用中。 - acushner
我是指,从概念上和使用Python的角度来看,在这里使用生成器表达式更有意义。如果使用列表推导式,则会创建一个列表,然后立即将其丢弃。此外,生成器表达式更加简洁。你的答案很好,没有理由添加我的答案,我只会出于上述原因使用生成器表达式。 - acushner
1
@johnmoeller 另外一个信息:如果生成器表达式是函数调用的唯一参数,就不需要额外的括号(例如 sum(i for i in range(10)) - acushner

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接