如何使用Dask高效地并行化时间序列预测?

5
我正在尝试使用dask在python中并行化时间序列预测。数据格式是每个时间序列为一列,它们具有月度日期的共同索引。我有一个自定义的预测函数,返回一个包含拟合和预测值的时间序列对象。我想将此函数应用于数据框(所有时间序列的所有列),并返回一个包含所有这些序列的新数据框,以上传到数据库。我已经通过运行以下代码使代码正常工作:
data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func, 
    axis=0)).compute(get=dask.multiprocessing.get)

我的问题是,Dask是否有一种按列而不是按行进行分区的方法,因为在这种情况下,我需要保持有序的时间索引以使预测函数正确工作。
如果没有,我该如何重新格式化数据,以便能够进行高效的大规模预测,并仍然以我需要推送到数据库的格式返回数据? 数据格式示例

1
嗨,戴维斯,你介意分享一下你的性能改进吗?你使用了哪个算法?我做了类似的事情,但我的时间序列是垂直堆叠的,而不是水平的。 - rpanai
1
当然,现在我正在使用https://facebook.github.io/prophet/。通常,在一个时间序列上进行拟合和预测需要3秒钟。 当我只使用1个核心的apply时,处理1000个时间序列大约需要1小时。 因此,所需时间基本上是按核心或工作人员数量减少的(尚未对不同核心与工作人员进行大量测试),因此使用3个核心/工作人员,处理1000个时间序列需要大约15分钟来进行拟合和预测。 - Davis
1
我应该指出,预测函数大约一半的计算时间都花在了调整索引以使数据符合我想要的prophet格式上。但是我想创建一个通用的分布式框架来支持任何预测函数,所以如果你不需要这样做,你可能可以让它更快。这种方法所需的只是您将数据帧格式化,并且您的函数除了返回单个时间序列对象之外,还需要将其作为输入(尽管我不确定不同长度的系列是否会破坏数据帧组合函数)。 - Davis
1
谢谢,我一开始也用了同样的算法,但是性能表现很差。最后我使用了多进程。我会尝试按照你的方法来看看是否会有所改善。关于核心/工作线程,请记住fbprophet使用pystan,因此您不能使用比核心更多的工作线程(请参见GIL)。我期待着看到您的框架(如果在github上,请分享链接)。只有一个注意事项:您是否考虑过如何将时间序列作为列添加回归器 - rpanai
1
没问题,酷!是啊,请让我知道。它目前尚未在Github上,并且可能不会发布一段时间,因为它目前仅限内部使用,而且我刚刚开始工作。对于x个回归器,我已经考虑过了,如果您只关心y的预测,那就没有问题,但是如果您想看到模型对x的预测结果以理解您的模型,那么我同意这将使事情复杂化。我仍在试图概念化当前的通用框架。我还不确定在当前状态下是否理想。 - Davis
显示剩余2条评论
2个回答

4
感谢您的帮助,我非常感激。我使用了dask.delayed解决方案,它运行得非常好,只需要本地集群的三分之一时间。
对于任何有兴趣的人,这是我实现的解决方案:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask

cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)

#get list of time series back
output = []
for i in small_df:
    forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
    output.append(forecasted_series)

total = dask.delayed(output).compute()

#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()

这会给你融合的数据框架结构,如果你想要序列成为列,可以使用以下转换:
pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')

在 Pandas 数据框中,small_df 的格式是这样的,其中 Date 是索引


我尝试了使用dask==2.15.0fbprophet==0.6的方法,使用包含25个时间序列和每个序列1000个观测值的small_df。它有效,并且在使用LocalCluster计算时的时间比不使用dask(即dask.delayed方法约快33%)快大约2/3。接下来,我将25个时间序列增加到50个,再次在我的笔记本电脑上(具有4个内核和8GB)使用LocalCluster运行时很快就耗尽了内存。您是否有使用包含数千个时间序列的large_df,并尝试将其与dask.delayed + fbprophet方法扩展的经验? - edesz

1
Dask dataframe 只按行对数据进行分区。请参阅Dask dataframe 文档Dask array 可以沿任何维度进行分区。但是,您必须使用 Numpy 语义,而不是 Pandas 语义。
您可以使用 dask delayedfutures 进行任何想做的事情。这个并行计算示例在更通用的教程中给出,可能会给您一些想法。

我在考虑,既然所有的系列长度都相同,那么我可以将数据框融合起来,然后按时间序列对象的数量进行分区(使用chunksize=len(series)),然后将该dask数据框应用于函数,并在之后重命名列名。这样做会有1000个分区会有问题吗?或者与列名的顺序有关的问题? - Davis
我并不完全理解这个注释(我对melt或大部分pandas API不熟悉)。一般来说,我不知道有什么聪明的方法可以从dask.dataframe中获得列级并行处理能力。我建议使用dask数组或者dask延迟。成千上万个分区是可以的,但会增加开销。 - MRocklin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接