Dask和fbprophet

3

我正在尝试同时使用daskfbprophet库,但我要么做错了什么,要么遇到了意外的性能问题。

import dask.dataframe as dd
import datetime as dt
import multiprocessing as mp 
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from fbprophet import Prophet
import time
ncpu = mp.cpu_count()

def parallel_pd(fun, vec, pool = ncpu-1):
    with mp.Pool(pool) as p:
        res = p.map(fun,vec)
    return(res)

def forecast1dd(ts):
    time.sleep(0.1)
    return ts["y"].max()

def forecast1mp(key):
    ts = df[df["key"]==key]
    time.sleep(0.1)
    return ts["y"].max()

def forecast2dd(ts):
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
                                                  periods=7, freq="D")})
    key = ts.name
    model = Prophet(yearly_seasonality=True)
    model.fit(ts)
    forecast = model.predict(future)
    future["yhat"] = forecast["yhat"]
    future["key"] =  key
    return future.as_matrix()

def forecast2mp(key):
    ts = df[df["key"]==key]
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
                                                  periods=7, freq="D")})
    model = Prophet(yearly_seasonality=True)
    model.fit(ts)
    forecast = model.predict(future)
    future["yhat"] = forecast["yhat"]
    future["key"] =  key
    return future.as_matrix()

我有一个自定义函数,约需0.1秒运行时间,因此forecast1ddforecast1mp是对我的函数进行模拟的,对于下面的数据框:

N = 2*365
key_n = 5000
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
                   "y":np.random.normal(100,20,N),
                  "key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)

我分别获得

%%time
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s"))
df1dd = grp.to_frame().compute()
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s
Wall time: 1min 8s

%%time
res = parallel_pd(forecast1mp,keys)
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s
Wall time: 10min 36s

在第一种情况下,核心并未使用100%,但性能与我的实际情况相符。可以使用行分析器轻松检查,第二种情况下性能缓慢的罪魁祸首是ts = df[df["key"]==key],如果我们有更多的键,则情况会变得更糟。
因此,到目前为止我对感到满意。但每当我尝试使用时,情况就会改变。在这里,我使用较少的,但不像之前的情况,的性能总是比差。
N = 2*365
key_n = 200
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
                   "y":np.random.normal(100,20,N),
                  "key":np.repeat(str(k),N)}) for k in range(key_n)])

keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)

%%time
grp = ddf.groupby("key").apply(forecast2dd, 
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s
Wall time: 3min 30s

%%time
res = parallel_pd(forecast2mp,keys)
df2mp = pd.concat([pd.DataFrame(a) for a in res])
CPU times: user 76 ms, sys: 160 ms, total: 236 ms
Wall time: 39.4 s

现在我的问题是:

  • 如何使用dask提高prophet的性能?
  • 我应该怎么做才能让dask使用100%的核心?
1个回答

3

我怀疑Prophet正在持有GIL,所以在计算ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s"))时,只有一个线程可以同时运行Python代码。使用multiprocessing可以规避这个问题,但需要复制你的数据ncpu次。这应该与你的parallel_pd函数具有类似的运行时间。

%%time
with dask.set_options(get=dask.multiprocessing.get):
    grp = ddf.groupby("key").apply(forecast2dd, 
        meta=pd.Series(name="s")).to_frame().compute()

df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])

CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s
Wall time: 1min 27s

您可以尝试向Prophet开发人员询问他们是否需要持有GIL。我怀疑问题出现在PyStan中,当实际的Stan解算器正在运行时,他们可能不需要GIL。这里有一个Github问题(链接) 另外注意:由于您的样本forecast1dd是一个聚合,因此可以使用dd.Aggregation来更快地运行。
%%time

def forcast1dd_chunk(ts):
    time.sleep(0.1)
    return ts.max()

def forecast1dd_agg(ts):
    return ts.max()

f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg)

grp = ddf.groupby("key")[['y']].agg(f1dd)
x = grp.compute()

CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms
Wall time: 355 ms

尽管这并不符合您实际的问题,因为它不是一个聚合问题。

1
嗨Tom,我尝试了你的方法以及from dask.distributed import Clientclient = Client(),性能基本相同。问题在于,每当我使用key_n = 5000forecast2dd时,就会出现以下错误:OSError: [Errno 24] Too many open files: '/dev/null' - rpanai
我通过在终端中运行ulimit -Sn 10000解决了“OSError: [Errno 24] Too many open files”这个问题。 - rpanai

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接