我对Dask还不熟悉,认为这将是一个简单的任务。我想从多个CSV文件中加载数据并将其组合成一个Dask数据帧。在这个例子中,有5个CSV文件,每个文件中有10,000行数据。显然,我想给组合后的数据帧一个唯一的索引。
于是我做了这个操作:
import dask.dataframe as dd
# Define Dask computations
dataframes = [
dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
for filename in os.listdir(data_dir) if filename.endswith('.csv')
]
combined_df = dd.concat(dataframes).reset_index(drop=True)
如果我执行
combined_df.head().index
,则会得到预期的结果:RangeIndex(start=0, stop=5, step=1)
但是combined_df.tail().index
结果与预期不符:
RangeIndex(start=3252, stop=3257, step=1)
进一步检查发现combined_df
的索引值由大约3256个长度为15的分离系列组成,总长度为50000。注意,所有csv文件都包含第一列从0到10000的索引。
这里正在发生什么,我如何获得标准的整数索引从0到50000,这是所有csv文件中的总行数?
背景
如果您需要测试上面的代码,请使用以下设置脚本创建一些csv文件:
import os
import numpy as np
import pandas as pd
# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)
data_dir = 'data'
if not os.path.exists(data_dir):
os.mkdir(data_dir)
for i in range(5):
filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
if not os.path.exists(filepath):
data = (i + 1) * np.random.randn(shape[0], shape[1])
print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
pd.DataFrame(data).to_csv(filepath)
更新:
使用这种方法似乎出现了相同的问题:
combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])
RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)
我认为 reset_index
方法会产生相同的索引。
dask.dataframe
索引时,索引不会从0开始单调递增。相反,它将在每个分区中重新启动为0(例如,index1 = [0,...,10],index2 = [0,...]
)。这是由于无法静态地知道索引的完整长度所致。” - Bill