Dask能够并行读取CSV文件吗?

32

我正在将一个大型文本文件转换为hdf存储,以期更快地访问数据。 转换效果不错,但是从csv文件读取时未使用并行方式。速度非常慢(在SSD上处理1GB文本文件需要约30分钟,因此我的猜测是它不受IO限制)。

有没有办法让它在多个线程中并行读取?由于可能很重要,所以我目前被迫在Windows下运行 - 以防万一会有任何区别。

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df.categorize([ 'Type',
                'Condition',               
          ])

df.to_hdf("data/data.hdf", "Measurements", 'w')

2
我们在一个非Dask应用程序中也有类似的问题 - 相对容易从CSV文件创建多个块并并行读取它们。请记住,每一行都是一个有效记录。 - Christian Sauer
2个回答

28

是的,dask.dataframe可以并行读取。但是您会遇到两个问题:

Pandas.read_csv仅部分释放GIL

默认情况下,dask.dataframe使用线程并行化,因为大多数Pandas可以在多个线程中并行运行(释放GIL)。 Pandas.read_csv是一个例外,特别是如果您的结果数据框使用对象dtypes进行文本处理。

dask.dataframe.to_hdf(filename)强制顺序计算

写入单个HDF文件将强制进行顺序计算(很难并行写入单个文件)。

编辑:新解决方案

今天我会避免使用HDF,改用Parquet。我可能会使用multiprocessing或dask.distributed调度程序来避免单台机器上的GIL问题。这两者的结合应该可以实现完全的线性扩展。

from dask.distributed import Client
client = Client()

df = dask.dataframe.read_csv(...)
df.to_parquet(...)

解决方案

由于您的数据集可能适合内存,建议使用dask.dataframe.read_csv并行加载多个进程,然后立即切换到Pandas。

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv("data/Measurements*.csv",  # read in parallel
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(get=dask.multiprocessing.get)     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')

希望后续从hdf5文件中读取数据能够超越基于文本的csv文件,非常感谢。我对dask真的很兴奋。 - Magellan88
7
您能否用两种方式进一步扩展这个答案呢?首先,我的csv文件太大而无法全部载入内存。第二,也许更复杂的是,该csv文件被压缩成zip格式,而Dask目前不支持zip格式。在这个讨论中(https://github.com/dask/dask/issues/2554)提到了`dask.delayed`,但我不确定如何将其与`pd.read_csv`和`chunksize`结合使用。谢谢! - tobiasraabe

15
借鉴了@MRocklin的回答,新版dask中,您可以使用df.compute(scheduler='processes')或者df.compute(scheduler='threads')来使用多进程或者多线程转换为pandas数据框:Original Answer.
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(scheduler='processes')     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')

嗨@mgoldwasser,好答案。在单台机器上运行时,这两个选项-processesthreads之间有什么区别? - edesz
3
嗨@edesz-线程共享内存,并受到GIL(全局解释器锁)的限制,而进程则作为单独的进程运行并具有额外的开销。通常情况下,在Python中,多线程由于GIL的限制而表现不佳,除非任务是IO绑定的(例如,如果每个任务都是下载文件)。如果不确定,可以尝试两种方法并查看哪种更快。 - mgoldwasser

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接