重新采样pandas数据框并对时间序列数据进行插值处理以填充缺失值

4

我需要对时间序列数据进行重新采样,并在每小时的15分钟间隔内插入缺失值。每个ID应该每小时有四行数据。

输入:

ID            Time  Value
1   1/1/2019 12:17      3
1   1/1/2019 12:44      2
2   1/1/2019 12:02      5
2   1/1/2019 12:28      7

输出:
ID                Time  Value
1  2019-01-01 12:00:00    3.0
1  2019-01-01 12:15:00    3.0
1  2019-01-01 12:30:00    2.0
1  2019-01-01 12:45:00    2.0
2  2019-01-01 12:00:00    5.0
2  2019-01-01 12:15:00    7.0
2  2019-01-01 12:30:00    7.0
2  2019-01-01 12:45:00    7.0

我编写了一个函数来实现这个功能,但在尝试处理较大数据集时效率显著下降。
是否有更高效的方法来实现这个功能?
import datetime
import pandas as pd


data = pd.DataFrame({'ID': [1,1,2,2], 
                    'Time': ['1/1/2019 12:17','1/1/2019 12:44','1/1/2019 12:02','1/1/2019 12:28'], 
                    'Value': [3,2,5,7]})


def clean_dataset(data):
    ids = data.drop_duplicates(subset='ID')
    data['Time'] = pd.to_datetime(data['Time'])
    data['Time'] = data['Time'].apply(
    lambda dt: datetime.datetime(dt.year, dt.month, dt.day, dt.hour,15*(dt.minute // 15)))
    data = data.drop_duplicates(subset=['Time','ID']).reset_index(drop=True)
    df = pd.DataFrame(columns=['Time','ID','Value'])
    for i in range(ids.shape[0]):
        times = pd.DataFrame(pd.date_range('1/1/2019 12:00','1/1/2019 13:00',freq='15min'),columns=['Time'])
        id_data = data[data['ID']==ids.iloc[i]['ID']]
        clean_data = times.join(id_data.set_index('Time'), on='Time')
        clean_data = clean_data.interpolate(method='linear', limit_direction='both')
        clean_data.drop(clean_data.tail(1).index,inplace=True)
        df = df.append(clean_data)
    return df


clean_dataset(data)
2个回答

2

线性插值在大数据集下会变得很慢。代码中的循环也是减速的主要原因之一。任何可以从循环中移除并预先计算的内容都有助于提高效率。例如,如果您预定义用于初始化times的数据框,则代码的效率将提高14%:

times_template = pd.DataFrame(pd.date_range('1/1/2019 12:00','1/1/2019 13:00',freq='15min'),columns=['Time'])
for i in range(ids.shape[0]):
    times = times_template.copy()

对代码进行分析确认插值(22.7%)所花费的时间最长,其次是连接(13.1%)、追加(7.71%)和删除(7.67%)命令。


1

您可以使用:

#round datetimes by 15 minutes
data['Time'] = pd.to_datetime(data['Time'])
minutes = pd.to_timedelta(15*(data['Time'].dt.minute // 15), unit='min')
data['Time'] = data['Time'].dt.floor('H') + minutes

#change date range for 4 values (to `12:45`)
rng = pd.date_range('1/1/2019 12:00','1/1/2019 12:45',freq='15min')
#create MultiIndex and reindex
mux = pd.MultiIndex.from_product([data['ID'].unique(), rng], names=['ID','Time'])
data = data.set_index(['ID','Time']).reindex(mux).reset_index()
#interpolate per groups
data['Value'] = (data.groupby('ID')['Value']
                     .apply(lambda x: x.interpolate(method='linear', limit_direction='both')))
print (data)
   ID                Time  Value
0   1 2019-01-01 12:00:00    3.0
1   1 2019-01-01 12:15:00    3.0
2   1 2019-01-01 12:30:00    2.0
3   1 2019-01-01 12:45:00    2.0
4   2 2019-01-01 12:00:00    5.0
5   2 2019-01-01 12:15:00    7.0
6   2 2019-01-01 12:30:00    7.0
7   2 2019-01-01 12:45:00    7.0

如果范围无法更改:

data['Time'] = pd.to_datetime(data['Time'])
minutes = pd.to_timedelta(15*(data['Time'].dt.minute // 15), unit='min')
data['Time'] = data['Time'].dt.floor('H') + minutes

#end in 13:00
rng = pd.date_range('1/1/2019 12:00','1/1/2019 13:00',freq='15min')
mux = pd.MultiIndex.from_product([data['ID'].unique(), rng], names=['ID','Time'])
data = data.set_index(['ID','Time']).reindex(mux).reset_index()
data['Value'] = (data.groupby('ID')['Value']
                     .apply(lambda x: x.interpolate(method='linear', limit_direction='both')))

#remove last row per groups
data = data[data['ID'].duplicated(keep='last')]
print (data)
   ID                Time  Value
0   1 2019-01-01 12:00:00    3.0
1   1 2019-01-01 12:15:00    3.0
2   1 2019-01-01 12:30:00    2.0
3   1 2019-01-01 12:45:00    2.0
5   2 2019-01-01 12:00:00    5.0
6   2 2019-01-01 12:15:00    7.0
7   2 2019-01-01 12:30:00    7.0
8   2 2019-01-01 12:45:00    7.0

编辑:

使用merge和左连接而不是reindex的另一种解决方案:

from  itertools import product

#round datetimes by 15 minutes
data['Time'] = pd.to_datetime(data['Time'])
minutes = pd.to_timedelta(15*(data['Time'].dt.minute // 15), unit='min')
data['Time'] = data['Time'].dt.floor('H') + minutes

#change date range for 4 values (to `12:45`)
rng = pd.date_range('1/1/2019 12:00','1/1/2019 12:45',freq='15min')
#create helper DataFrame and merge with left join
df = pd.DataFrame(list(product(data['ID'].unique(), rng)), columns=['ID','Time'])
print (df)
   ID                Time
0   1 2019-01-01 12:00:00
1   1 2019-01-01 12:15:00
2   1 2019-01-01 12:30:00
3   1 2019-01-01 12:45:00
4   2 2019-01-01 12:00:00
5   2 2019-01-01 12:15:00
6   2 2019-01-01 12:30:00
7   2 2019-01-01 12:45:00

data = df.merge(data, how='left')
##interpolate per groups
data['Value'] = (data.groupby('ID')['Value']
                     .apply(lambda x: x.interpolate(method='linear', limit_direction='both')))
print (data)
   ID                Time  Value
0   1 2019-01-01 12:00:00    3.0
1   1 2019-01-01 12:15:00    3.0
2   1 2019-01-01 12:30:00    2.0
3   1 2019-01-01 12:45:00    2.0
4   2 2019-01-01 12:00:00    5.0
5   2 2019-01-01 12:15:00    7.0
6   2 2019-01-01 12:30:00    7.0
7   2 2019-01-01 12:45:00    7.0

这对于这个任务非常有效,谢谢!但是在测试这个程序时,我遇到了一个问题,当数据集扩大到约6000万行时,解释器被终止了。我猜测这可能是内存问题?您有处理这么大数据集的建议吗? - primo7
1
@primo7 - 6000万行是非常大的DataFrame,因此需要增加RAM。或者使用类似dask的库。 - jezrael
1
@primo7 - 添加了另一个解决方案,请查看。 - jezrael
谢谢!在大型数据框上,合并和左连接方法效果很好。 - primo7

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接