如何在Dask中对连接的数据框重置索引?

5

我对Dask还不熟悉,认为这将是一个简单的任务。我想从多个CSV文件中加载数据并将其组合成一个Dask数据帧。在这个例子中,有5个CSV文件,每个文件中有10,000行数据。显然,我想给组合后的数据帧一个唯一的索引。

于是我做了这个操作:

import dask.dataframe as dd

# Define Dask computations
dataframes = [
    dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
    for filename in os.listdir(data_dir) if filename.endswith('.csv')
]

combined_df = dd.concat(dataframes).reset_index(drop=True)

如果我执行combined_df.head().index,则会得到预期的结果:
RangeIndex(start=0, stop=5, step=1)

但是combined_df.tail().index结果与预期不符:

RangeIndex(start=3252, stop=3257, step=1)

进一步检查发现combined_df的索引值由大约3256个长度为15的分离系列组成,总长度为50000。注意,所有csv文件都包含第一列从0到10000的索引。

这里正在发生什么,我如何获得标准的整数索引从0到50000,这是所有csv文件中的总行数?

背景

如果您需要测试上面的代码,请使用以下设置脚本创建一些csv文件:

import os
import numpy as np
import pandas as pd

# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)

data_dir = 'data'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

for i in range(5):
    filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
    if not os.path.exists(filepath):
        data = (i + 1) * np.random.randn(shape[0], shape[1])
        print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
        pd.DataFrame(data).to_csv(filepath)

更新:

使用这种方法似乎出现了相同的问题:

combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])

RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)

我认为 reset_index 方法会产生相同的索引。


1
啊,我现在在文档中看到了,它说:“请注意,与pandas不同,重置dask.dataframe索引时,索引不会从0开始单调递增。相反,它将在每个分区中重新启动为0(例如,index1 = [0,...,10],index2 = [0,...])。这是由于无法静态地知道索引的完整长度所致。” - Bill
1个回答

9
在版本中,会在每个分区上分别执行其任务(并行执行),因此索引中的连续数字在某些点上“重新启动”,实际上是在每个分区的开头。 为了解决这个限制,您可以:
  • 分配一个填充了<1>的新列。
  • 将索引设置为在此列上计算的(幸运的是,与相反,是在整个DataFrame上计算的)。
副作用是索引的名称现在是这个新列的名称。 如果要清除它,必须在分区级别上进行调用。
因此,整个代码可以是:
ddf = ddf.assign(idx=1)
ddf = ddf.set_index(ddf.idx.cumsum() - 1)
ddf = ddf.map_partitions(lambda df: df.rename(index = {'idx': None}))

请注意,assign(idx=1) 是可以的,因为这个单一值被广播到整个 DataFrame 的长度,所以这个新列中的每个元素都将设置为 1,而我无需知道 DataFrame 包含多少行。这是底层Numpy软件包的一个杰出特性,它大大简化了在NumpyPandasdask中的编程。
然后您可以运行:ddf.compute()来查看结果。

那么,先将 dask DataFrame 转换为 Pandas 版本,然后再调用 reset_index() 呢?这样,“pandasonic”版本将在整个 DataFrame 上操作,而不是在单个分区上(就像在 dask 中一样)。 - Valdi_Bo
1
我的数据框太大了,无法放入内存。这就是我使用Dask的原因。 - Bill
为什么 ddf = ddf.set_index(np.arange(len(ddf))) 无法正常工作?这通常是我在 Pandas 中替换索引的方法。(它会引发 KeyError 异常)。 - Bill
我担心set_index也被分解为特定分区上的操作。也许dask尝试在每个分区中设置此索引?当你已经有一个(我的)可行解决方案时,为什么要尝试寻找另一个解决方案呢? - Valdi_Bo
尝试使用np.arange分配一个新列,然后你将拥有一个现有的列。然后在此列上调用set_index。但是这个解决方案变得非常类似于我的。唯一的区别是你一次性生成了这个新列,而我设置了一个列为1,然后调用了*cumsum()*。 - Valdi_Bo
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接