我可能会通过使用dask以流式方式加载数据来解决这个问题。例如,您可以按如下方式创建一个dask数据框:
import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
这个data
对象实际上还没有做任何事情;它只是一种从磁盘中以可管理的块读取数据框的“配方”。如果您想要具体化数据,可以调用compute()
:
df = data.compute().reset_index(drop=True)
现在你有一个标准的pandas数据帧(我们称之为reset_index
,因为默认情况下每个分区都是独立索引的)。结果等同于直接调用pd.read_csv
时获得的结果:
在这一点上,您已经有了一个标准的 pandas 数据帧(我们称之为 reset_index
,因为默认情况下每个分区都有自己的索引)。它的结果与直接调用 pd.read_csv
得到的结果相同:
df.equals(pd.read_csv('test.csv'))
dask的好处在于您可以为构建数据框架添加指令;例如,您可以按如下方式使数据的每个分区变得稀疏:
data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
此时调用compute()
将构建一个稀疏数组:
df = data.compute().reset_index(drop=True)
type(df)
性能分析
为了比较dask方法和原始的pandas方法,让我们进行一些代码行级别的性能分析。我将使用lprun
和mprun
,如此处所述(完整披露:这是我自己书中的一个部分)。
假设你正在使用Jupyter笔记本电脑,可以按照以下方式运行它:
首先,创建一个单独的文件,其中包含我们要执行的基本任务:
%%file dask_load.py
import numpy as np
import pandas as pd
import dask.dataframe as ddf
def compare_loads():
df = pd.read_csv('test.csv')
df_sparse = df.to_sparse(fill_value=0)
df_dask = ddf.read_csv('test.csv', blocksize=10E6)
df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
df_dask = df_dask.compute().reset_index(drop=True)
接下来,让我们逐行对计算时间进行分析:
%load_ext line_profiler
from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
我得到了以下结果:
Timer unit: 1e-06 s
Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6
Line
==============================================================
6 def compare_loads():
7 1 4746788 4746788.0 34.1 df = pd.read_csv('test.csv')
8 1 769303 769303.0 5.5 df_sparse = df.to_sparse(fill_value=0)
9
10 1 33992 33992.0 0.2 df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 1 7848 7848.0 0.1 df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 1 8348217 8348217.0 60.0 df_dask = df_dask.compute().reset_index(drop=True)
我们可以看到,在上面的示例数组中,大约60%的时间用于dask调用,而大约40%的时间用于pandas调用。这告诉我们,对于这个任务,dask比pandas慢大约50%,这是可以预料的,因为数据分块和重组会导致一些额外的开销。
Dask在内存使用方面表现出色:让我们使用mprun
进行逐行内存分析:
%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
在我的电脑上的结果是这样的:
Filename: /Users/jakevdp/dask_load.py
Line # Mem usage Increment Line Contents
================================================
6 70.9 MiB 70.9 MiB def compare_loads():
7 691.5 MiB 620.6 MiB df = pd.read_csv('test.csv')
8 828.8 MiB 137.3 MiB df_sparse = df.to_sparse(fill_value=0)
9
10 806.3 MiB -22.5 MiB df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 806.4 MiB 0.1 MiB df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 947.9 MiB 141.5 MiB df_dask = df_dask.compute().reset_index(drop=True)
我们可以看到最终的pandas dataframe大小约为140MB左右,但是在读取数据到临时密集对象时,pandas沿途使用了大约620MB。
另一方面,dask在加载数组和构建最终稀疏结果时仅使用了大约140MB的内存。如果您正在读取的数据的密集大小与您系统上可用的内存相当,那么dask具有明显的优势,尽管计算时间会慢约50%左右。
但是对于处理大型数据,您不应该止步于此。假设您正在对数据进行某些操作,dask dataframe抽象允许您在实例化数据之前对这些操作(即将它们添加到“配方”中)进行操作。因此,如果您正在使用数据进行算术、聚合、分组等操作,您甚至不需要担心稀疏存储:只需使用dask对象执行这些操作,最后调用compute()
,dask将以内存有效的方式应用它们。
例如,我可以使用dask dataframe计算每列的max()
,而无需一次性将整个数据加载到内存中:
>>> data.max().compute()
x 5.38114
y 5.33796
z 5.25661
txt j
dtype: object
直接使用dask数据框将使您避免对数据表示的担忧,因为您可能永远不必一次性将所有数据加载到内存中。
祝好运!
read_csv
的关键字参数dtype
吗?我猜如果你将列的数据类型声明为int
,这将大大加快to_sparse
方法的性能。在这里使用Ctrl+f
查找“dtype”。 - user2734178